Hi, thanks for testing this patch!

On Thu Oct 23, 2025 at 6:49 AM -03, jian he wrote:
> On Thu, Oct 23, 2025 at 8:01 AM Matheus Alcantara
> <[email protected]> wrote:
>>
>> Please see the attached v3 version that implements this idea.
>>
> hi.
>
> I am not famailith with this module.
> some of the foreach can be replaced with foreach_int.
>
Fixed.

> I suspect that somewhere Form_pg_attribute.attisdropped is not handled 
> properly.
> the following setup will crash.
>
> ---source database
> drop table batch_table1;
> create table batch_table1(x int);
>
> ---foreign table database
> drop foreign table if exists ftable1;
> CREATE FOREIGN TABLE ftable1 ( x int ) SERVER loopback1 OPTIONS (
> table_name 'batch_table1', batch_size '10' );
> ALTER FOREIGN TABLE ftable1 DROP COLUMN x;
> ALTER FOREIGN TABLE ftable1 add COLUMN x int;
>
> INSERT INTO ftable SELECT * FROM generate_series(1, 10) i; --- this
> will cause server crash.
>
I've tested this scenario and the field attisdropped is still being set
to false. After some debugging I realize that the problem was how I was
accessing the fmstate->p_flinfo array - I was using the attum-1 but I
don't think that it's correct. 

On create_foreign_modify() we have the following code:

fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
fmstate->p_nums = 0;
if (operation == CMD_INSERT || operation == CMD_UPDATE)
{
    /* Set up for remaining transmittable parameters */
    foreach(lc, fmstate->target_attrs)
    {
        int                     attnum = lfirst_int(lc);
        Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);

        Assert(!attr->attisdropped);

        /* Ignore generated columns; they are set to DEFAULT */
        if (attr->attgenerated)
            continue;
        getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
        fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
        fmstate->p_nums++;
    }
}

So I think that I should access fmstate->p_flinfo array when looping
through the target_attrs using an int value starting at 0 and ++ after
each iteration. Although I'm not sure if my understanding is fully
correct I've implemented this on the attached patch and it seems to fix
the error.

On this new version I also added some regress tests on postgres_fdw.sql

--
Matheus Alcantara
From 1856fba0c49d5aa7b164debb90b376c72cfa3e02 Mon Sep 17 00:00:00 2001
From: Matheus Alcantara <[email protected]>
Date: Fri, 10 Oct 2025 16:07:08 -0300
Subject: [PATCH v4] postgres_fdw: Use COPY to speed up batch inserts

---
 contrib/postgres_fdw/deparse.c                |  30 ++++
 .../postgres_fdw/expected/postgres_fdw.out    | 120 +++++++++++--
 contrib/postgres_fdw/postgres_fdw.c           | 159 +++++++++++++++++-
 contrib/postgres_fdw/postgres_fdw.h           |   1 +
 contrib/postgres_fdw/sql/postgres_fdw.sql     |  52 ++++++
 5 files changed, 350 insertions(+), 12 deletions(-)

diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c
index f2fb0051843..113e6fb7d91 100644
--- a/contrib/postgres_fdw/deparse.c
+++ b/contrib/postgres_fdw/deparse.c
@@ -2236,6 +2236,36 @@ rebuildInsertSql(StringInfo buf, Relation rel,
        appendStringInfoString(buf, orig_query + values_end_len);
 }
 
+/*
+ *  Build a COPY FROM STDIN statement using the TEXT format
+ */
+void
+buildCopySql(StringInfo buf, Relation rel, List *target_attrs)
+{
+       TupleDesc       tupdesc = RelationGetDescr(rel);
+       bool            first = true;
+
+       appendStringInfo(buf, "COPY ");
+       deparseRelation(buf, rel);
+       appendStringInfo(buf, "(");
+
+       foreach_int(attnum, target_attrs)
+       {
+               Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
+
+               if (attr->attgenerated)
+                       continue;
+
+               if (!first)
+                       appendStringInfoString(buf, ", ");
+
+               first = false;
+
+               appendStringInfoString(buf, 
quote_identifier(NameStr(attr->attname)));
+       }
+       appendStringInfoString(buf, ") FROM STDIN");
+}
+
 /*
  * deparse remote UPDATE statement
  *
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out 
b/contrib/postgres_fdw/expected/postgres_fdw.out
index cd28126049d..dd507ad6186 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -50,6 +50,18 @@ CREATE TABLE "S 1"."T 4" (
        c3 text,
        CONSTRAINT t4_pkey PRIMARY KEY (c1)
 );
+CREATE TABLE "S 1"."T 5"(
+    x int
+);
+CREATE TABLE "S 1"."T 6"(
+    id int not null,
+    note text,
+    value int NOT NULL
+);
+CREATE TABLE "S 1"."T 7"(
+    id int,
+    t text
+);
 -- Disable autovacuum for these tables to avoid unexpected effects of that
 ALTER TABLE "S 1"."T 1" SET (autovacuum_enabled = 'false');
 ALTER TABLE "S 1"."T 2" SET (autovacuum_enabled = 'false');
@@ -132,6 +144,21 @@ CREATE FOREIGN TABLE ft7 (
        c2 int NOT NULL,
        c3 text
 ) SERVER loopback3 OPTIONS (schema_name 'S 1', table_name 'T 4');
+CREATE FOREIGN TABLE ft8 (
+    x int
+)
+SERVER loopback OPTIONS (schema_name 'S 1', table_name 'T 5', batch_size '10');
+CREATE FOREIGN TABLE ft9 (
+    id int not null,
+    note text,
+    value int NOT NULL
+)
+SERVER loopback OPTIONS (schema_name 'S 1', table_name 'T 6', batch_size '10');
+CREATE FOREIGN TABLE ft10 (
+    id int,
+    t text
+)
+SERVER loopback OPTIONS (schema_name 'S 1', table_name 'T 7', batch_size '10');
 -- ===================================================================
 -- tests for validator
 -- ===================================================================
@@ -205,16 +232,19 @@ ALTER FOREIGN TABLE ft2 OPTIONS (schema_name 'S 1', 
table_name 'T 1');
 ALTER FOREIGN TABLE ft1 ALTER COLUMN c1 OPTIONS (column_name 'C 1');
 ALTER FOREIGN TABLE ft2 ALTER COLUMN c1 OPTIONS (column_name 'C 1');
 \det+
-                              List of foreign tables
- Schema | Table |  Server   |              FDW options              | 
Description 
---------+-------+-----------+---------------------------------------+-------------
- public | ft1   | loopback  | (schema_name 'S 1', table_name 'T 1') | 
- public | ft2   | loopback  | (schema_name 'S 1', table_name 'T 1') | 
- public | ft4   | loopback  | (schema_name 'S 1', table_name 'T 3') | 
- public | ft5   | loopback  | (schema_name 'S 1', table_name 'T 4') | 
- public | ft6   | loopback2 | (schema_name 'S 1', table_name 'T 4') | 
- public | ft7   | loopback3 | (schema_name 'S 1', table_name 'T 4') | 
-(6 rows)
+                                      List of foreign tables
+ Schema | Table |  Server   |                      FDW options                 
      | Description 
+--------+-------+-----------+--------------------------------------------------------+-------------
+ public | ft1   | loopback  | (schema_name 'S 1', table_name 'T 1')            
      | 
+ public | ft10  | loopback  | (schema_name 'S 1', table_name 'T 7', batch_size 
'10') | 
+ public | ft2   | loopback  | (schema_name 'S 1', table_name 'T 1')            
      | 
+ public | ft4   | loopback  | (schema_name 'S 1', table_name 'T 3')            
      | 
+ public | ft5   | loopback  | (schema_name 'S 1', table_name 'T 4')            
      | 
+ public | ft6   | loopback2 | (schema_name 'S 1', table_name 'T 4')            
      | 
+ public | ft7   | loopback3 | (schema_name 'S 1', table_name 'T 4')            
      | 
+ public | ft8   | loopback  | (schema_name 'S 1', table_name 'T 5', batch_size 
'10') | 
+ public | ft9   | loopback  | (schema_name 'S 1', table_name 'T 6', batch_size 
'10') | 
+(9 rows)
 
 -- Test that alteration of server options causes reconnection
 -- Remote's errors might be non-English, so hide them to ensure stable results
@@ -12664,6 +12694,76 @@ ANALYZE analyze_ftable;
 -- cleanup
 DROP FOREIGN TABLE analyze_ftable;
 DROP TABLE analyze_table;
+-- ===================================================================
+-- test for batch insert using COPY
+-- ===================================================================
+ALTER FOREIGN TABLE ft8 DROP COLUMN x;
+ALTER FOREIGN TABLE ft8 add COLUMN x int;
+INSERT INTO ft8 SELECT * FROM generate_series(1, 10) i;
+SELECT * FROM ft8;
+ x  
+----
+  1
+  2
+  3
+  4
+  5
+  6
+  7
+  8
+  9
+ 10
+(10 rows)
+
+EXPLAIN(ANALYZE, VERBOSE, COSTS OFF, SUMMARY OFF, BUFFERS OFF, TIMING OFF) 
INSERT INTO ft9 (id, value, note)
+SELECT g,
+       g * 2,
+       'batch insert test data' || g
+FROM generate_series(1, 20) g;
+                                   QUERY PLAN                                  
  
+---------------------------------------------------------------------------------
+ Insert on public.ft9 (actual rows=0.00 loops=1)
+   Remote SQL: COPY "S 1"."T 6"(id, note, value) FROM STDIN
+   Batch Size: 10
+   ->  Function Scan on pg_catalog.generate_series g (actual rows=20.00 
loops=1)
+         Output: g.g, ('batch insert test data'::text || (g.g)::text), (g.g * 
2)
+         Function Call: generate_series(1, 20)
+(6 rows)
+
+SELECT * FROM ft9;
+ id |           note           | value 
+----+--------------------------+-------
+  1 | batch insert test data1  |     2
+  2 | batch insert test data2  |     4
+  3 | batch insert test data3  |     6
+  4 | batch insert test data4  |     8
+  5 | batch insert test data5  |    10
+  6 | batch insert test data6  |    12
+  7 | batch insert test data7  |    14
+  8 | batch insert test data8  |    16
+  9 | batch insert test data9  |    18
+ 10 | batch insert test data10 |    20
+ 11 | batch insert test data11 |    22
+ 12 | batch insert test data12 |    24
+ 13 | batch insert test data13 |    26
+ 14 | batch insert test data14 |    28
+ 15 | batch insert test data15 |    30
+ 16 | batch insert test data16 |    32
+ 17 | batch insert test data17 |    34
+ 18 | batch insert test data18 |    36
+ 19 | batch insert test data19 |    38
+ 20 | batch insert test data20 |    40
+(20 rows)
+
+-- Test buffer limit of copy data on COPYBUFSIZ
+INSERT INTO ft10 (id, t)
+SELECT s, repeat(md5(s::text), 10000) from generate_series(100, 103) s;
+SELECT COUNT(*) FROM ft10;
+ count 
+-------
+     4
+(1 row)
+
 -- ===================================================================
 -- test for postgres_fdw_get_connections function with check_conn = true
 -- ===================================================================
diff --git a/contrib/postgres_fdw/postgres_fdw.c 
b/contrib/postgres_fdw/postgres_fdw.c
index 456b267f70b..d8e13e78938 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -63,6 +63,9 @@ PG_MODULE_MAGIC_EXT(
 /* If no remote estimates, assume a sort costs 20% extra */
 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
 
+/* Buffer size to send COPY IN data*/
+#define COPYBUFSIZ 8192
+
 /*
  * Indexes of FDW-private information stored in fdw_private lists.
  *
@@ -192,6 +195,7 @@ typedef struct PgFdwModifyState
        /* extracted fdw_private data */
        char       *query;                      /* text of INSERT/UPDATE/DELETE 
command */
        char       *orig_query;         /* original text of INSERT command */
+       char       *copy_query;         /* text of COPY command if it's being 
used */
        List       *target_attrs;       /* list of target attribute numbers */
        int                     values_end;             /* length up to the end 
of VALUES */
        int                     batch_size;             /* value of FDW option 
"batch_size" */
@@ -545,6 +549,9 @@ static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
                                                          const 
PgFdwRelationInfo *fpinfo_o,
                                                          const 
PgFdwRelationInfo *fpinfo_i);
 static int     get_batch_size_option(Relation rel);
+static TupleTableSlot **execute_foreign_insert_using_copy(PgFdwModifyState 
*fmstate,
+                                                                               
                                  TupleTableSlot **slots,
+                                                                               
                                  int *numSlots);
 
 
 /*
@@ -2942,8 +2949,23 @@ postgresExplainForeignModify(ModifyTableState *mtstate,
 {
        if (es->verbose)
        {
-               char       *sql = strVal(list_nth(fdw_private,
-                                                                               
  FdwModifyPrivateUpdateSql));
+               char       *sql = NULL;
+
+               /*
+                * We only have ri_FdwState during EXPLAIN(ANALYZE), so check 
if the
+                * COPY was used during query execution and show it as a Remote 
SQL.
+                */
+               if (rinfo->ri_FdwState != NULL)
+               {
+                       PgFdwModifyState *fmstate = (PgFdwModifyState *) 
rinfo->ri_FdwState;
+
+                       if (fmstate->copy_query != NULL)
+                               sql = fmstate->copy_query;
+               }
+
+               if (sql == NULL)
+                       sql = strVal(list_nth(fdw_private,
+                                                                 
FdwModifyPrivateUpdateSql));
 
                ExplainPropertyText("Remote SQL", sql, es);
 
@@ -4066,6 +4088,50 @@ create_foreign_modify(EState *estate,
        return fmstate;
 }
 
+/*
+ *  Write target attribute values from fmstate into buf buffer to be sent as
+ *  COPY FROM STDIN data
+ */
+static void
+convert_slot_to_copy_text(StringInfo buf,
+                                                 PgFdwModifyState *fmstate,
+                                                 TupleTableSlot *slot)
+{
+       TupleDesc       tupdesc = RelationGetDescr(fmstate->rel);
+       bool            first = true;
+       int                     i = 0;
+
+       foreach_int(attnum, fmstate->target_attrs)
+       {
+               CompactAttribute *attr = TupleDescCompactAttr(tupdesc, attnum - 
1);
+               Datum           datum;
+               bool            isnull;
+
+               /* Ignore generated columns; they are set to DEFAULT */
+               if (attr->attgenerated)
+                       continue;
+
+               if (!first)
+                       appendStringInfoCharMacro(buf, '\t');
+               first = false;
+
+               datum = slot_getattr(slot, attnum, &isnull);
+
+               if (isnull)
+                       appendStringInfoString(buf, "\\N");
+               else
+               {
+                       const char *value = 
OutputFunctionCall(&fmstate->p_flinfo[i],
+                                                                               
                   datum);
+
+                       appendStringInfoString(buf, value);
+               }
+               i++;
+       }
+
+       appendStringInfoCharMacro(buf, '\n');
+}
+
 /*
  * execute_foreign_modify
  *             Perform foreign-table modification as required, and fetch 
RETURNING
@@ -4097,6 +4163,13 @@ execute_foreign_modify(EState *estate,
        if (fmstate->conn_state->pendingAreq)
                process_pending_request(fmstate->conn_state->pendingAreq);
 
+       /*
+        * Use COPY command for batch insert if the original query don't 
include a
+        * RETURNING clause
+        */
+       if (operation == CMD_INSERT && *numSlots > 1 && !fmstate->has_returning)
+               return execute_foreign_insert_using_copy(fmstate, slots, 
numSlots);
+
        /*
         * If the existing query was deparsed and prepared for a different 
number
         * of rows, rebuild it for the proper number.
@@ -7886,3 +7959,85 @@ get_batch_size_option(Relation rel)
 
        return batch_size;
 }
+
+/*  Execute a batch insert into a foreign table using the COPY command */
+static TupleTableSlot **
+execute_foreign_insert_using_copy(PgFdwModifyState *fmstate,
+                                                                 
TupleTableSlot **slots,
+                                                                 int *numSlots)
+{
+       PGresult   *res;
+       StringInfoData sql;
+       StringInfoData copy_data;
+       int                     n_rows;
+       int                     i;
+
+       if (fmstate->copy_query == NULL)
+       {
+               /* Build COPY command */
+               initStringInfo(&sql);
+               buildCopySql(&sql, fmstate->rel, fmstate->target_attrs);
+
+               /* Cache for reuse. */
+               fmstate->copy_query = sql.data;
+       }
+
+       /* Send COPY command */
+       if (!PQsendQuery(fmstate->conn, fmstate->copy_query))
+               pgfdw_report_error(NULL, fmstate->conn, fmstate->copy_query);
+
+       /* get the COPY result */
+       res = pgfdw_get_result(fmstate->conn);
+       if (PQresultStatus(res) != PGRES_COPY_IN)
+               pgfdw_report_error(res, fmstate->conn, fmstate->copy_query);
+
+       /* Convert the TupleTableSlot data into a TEXT-formatted line */
+       initStringInfo(&copy_data);
+       for (i = 0; i < *numSlots; i++)
+       {
+               convert_slot_to_copy_text(&copy_data, fmstate, slots[i]);
+
+               /*
+                * Send initial COPY data if the buffer reach the limit to 
avoid large
+                * memory usage.
+                */
+               if (copy_data.len >= COPYBUFSIZ)
+               {
+                       if (PQputCopyData(fmstate->conn, copy_data.data, 
copy_data.len) <= 0)
+                               pgfdw_report_error(NULL, fmstate->conn, 
fmstate->copy_query);
+                       resetStringInfo(&copy_data);
+               }
+       }
+
+       /* Send the remaining COPY data */
+       if (copy_data.len > 0)
+       {
+               if (PQputCopyData(fmstate->conn, copy_data.data, copy_data.len) 
<= 0)
+                       pgfdw_report_error(NULL, fmstate->conn, 
fmstate->copy_query);
+       }
+
+       /* End the COPY operation */
+       if (PQputCopyEnd(fmstate->conn, NULL) < 0 || PQflush(fmstate->conn))
+               pgfdw_report_error(NULL, fmstate->conn, fmstate->copy_query);
+
+       /*
+        * Get the result, and check for success.
+        */
+       res = pgfdw_get_result(fmstate->conn);
+       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+               pgfdw_report_error(res, fmstate->conn, fmstate->copy_query);
+
+       n_rows = atoi(PQcmdTuples(res));
+
+       /* And clean up */
+       PQclear(res);
+
+       MemoryContextReset(fmstate->temp_cxt);
+
+       *numSlots = n_rows;
+
+       /*
+        * Return NULL if nothing was inserted on the remote end
+        */
+       return (n_rows > 0) ? slots : NULL;
+}
diff --git a/contrib/postgres_fdw/postgres_fdw.h 
b/contrib/postgres_fdw/postgres_fdw.h
index e69735298d7..c0198b865f3 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -204,6 +204,7 @@ extern void rebuildInsertSql(StringInfo buf, Relation rel,
                                                         char *orig_query, List 
*target_attrs,
                                                         int values_end_len, 
int num_params,
                                                         int num_rows);
+extern void buildCopySql(StringInfo buf, Relation rel, List *target_attrs);
 extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte,
                                                         Index rtindex, 
Relation rel,
                                                         List *targetAttrs,
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql 
b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 9a8f9e28135..79f4f305641 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -54,6 +54,18 @@ CREATE TABLE "S 1"."T 4" (
        c3 text,
        CONSTRAINT t4_pkey PRIMARY KEY (c1)
 );
+CREATE TABLE "S 1"."T 5"(
+    x int
+);
+CREATE TABLE "S 1"."T 6"(
+    id int not null,
+    note text,
+    value int NOT NULL
+);
+CREATE TABLE "S 1"."T 7"(
+    id int,
+    t text
+);
 
 -- Disable autovacuum for these tables to avoid unexpected effects of that
 ALTER TABLE "S 1"."T 1" SET (autovacuum_enabled = 'false');
@@ -146,6 +158,24 @@ CREATE FOREIGN TABLE ft7 (
        c3 text
 ) SERVER loopback3 OPTIONS (schema_name 'S 1', table_name 'T 4');
 
+CREATE FOREIGN TABLE ft8 (
+    x int
+)
+SERVER loopback OPTIONS (schema_name 'S 1', table_name 'T 5', batch_size '10');
+
+CREATE FOREIGN TABLE ft9 (
+    id int not null,
+    note text,
+    value int NOT NULL
+)
+SERVER loopback OPTIONS (schema_name 'S 1', table_name 'T 6', batch_size '10');
+
+CREATE FOREIGN TABLE ft10 (
+    id int,
+    t text
+)
+SERVER loopback OPTIONS (schema_name 'S 1', table_name 'T 7', batch_size '10');
+
 -- ===================================================================
 -- tests for validator
 -- ===================================================================
@@ -4379,6 +4409,28 @@ ANALYZE analyze_ftable;
 DROP FOREIGN TABLE analyze_ftable;
 DROP TABLE analyze_table;
 
+-- ===================================================================
+-- test for batch insert using COPY
+-- ===================================================================
+ALTER FOREIGN TABLE ft8 DROP COLUMN x;
+ALTER FOREIGN TABLE ft8 add COLUMN x int;
+
+INSERT INTO ft8 SELECT * FROM generate_series(1, 10) i;
+SELECT * FROM ft8;
+
+EXPLAIN(ANALYZE, VERBOSE, COSTS OFF, SUMMARY OFF, BUFFERS OFF, TIMING OFF) 
INSERT INTO ft9 (id, value, note)
+SELECT g,
+       g * 2,
+       'batch insert test data' || g
+FROM generate_series(1, 20) g;
+
+SELECT * FROM ft9;
+
+-- Test buffer limit of copy data on COPYBUFSIZ
+INSERT INTO ft10 (id, t)
+SELECT s, repeat(md5(s::text), 10000) from generate_series(100, 103) s;
+SELECT COUNT(*) FROM ft10;
+
 -- ===================================================================
 -- test for postgres_fdw_get_connections function with check_conn = true
 -- ===================================================================
-- 
2.51.0

Reply via email to