On Tue Oct 21, 2025 at 11:25 AM -03, Matheus Alcantara wrote:
>>> Lastly, I don't know if we should change the EXPLAIN(ANALYZE, VERBOSE)
>>> output for batch inserts that use the COPY to mention that we are
>>> sending the COPY command to the remote server. I guess so?
>>>
>>
>> Good point. We definitely should not show SQL for INSERT, when we're
>> actually running a COPY.
>>
> This seems a bit tricky to implement. The COPY is used based on the
> number of slots into the TupleTableSlot array that is used for batch
> insert. The numSlots that execute_foreign_modify() receive is coming
> from ResultRelInfo->ri_NumSlots during ExecInsert(). We don't have this
> information during EXPLAIN that is handled by
> postgresExplainForeignModify(), we only have the
> ResultRelInfo->ri_BatchSize at this stage. The current idea is to use
> the COPY command if the number of slots is > 1 so I'm wondering if we
> should use another mechanism to enable the COPY usage, for example, we
> could just use if the batch_size is configured to a number greater than
> X, but what if the INSERT statement is only inserting a single row,
> should we still use the COPY command to ingest a single row into the
> foreign table? Any thoughts?
>
Thinking more about this I realize that when we are deparsing the remote
SQL to be sent to the foreign server at the planner phase (via
postgresPlanForeignModify()) we don't have the batch_size and number of
rows information, so currently we can not know at the plan time if the
COPY usage for a batch insert is visible or not because IIUC these
information are only visible at query runtime.

One way to make it possible is that we could simply use the
PgFdwModifyState->copy_data during postgresExplainForeignModify() if
it's not null. Since we will only have this information during query
execution the drawback of this approach is that we would only show the
COPY as a Remote SQL on during EXPLAIN(ANALYZE).

Please see the attached v3 version that implements this idea.

> I tried to reuse the fmstate->query field to cache the COPY sql but
> running the postgres_fdw.sql regress test shows that this may not
> work. When we are running a user supplied COPY command on a foreign
> table the CopyMultiInsertBufferFlush() call
> ri_FdwRoutine->ExecForeignBatchInsert which may pass different values
> for numSlots based on the number of slots already sent to the foreign
> server, and eventually it may pass numSlots as 1 which will not use the
> COPY under the hood to send to the foreign server and if we cache the
> COPY command into the fmstate->query this will not work because the
> normal INSERT path on execute_foreign_modify uses the fmstate->query to
> build a prepared statement to send to the foreign server. So basically
> what I'm trying to say is that when the server is executing a COPY into
> a foreign it may use the COPY command or INSERT command to send the data
> to the foreign server. That being said, I decided to create a new
> copy_query field on PgFdwModifyState to cache only COPY commands. Please
> let me know if my understanding is wrong or if we could have a better
> approach here.
>
Based on the information that I've mention above I think that we need
some way to not mix INSERT with COPY commands when executing a COPY in
a foreign table supplied by the user. Or we should disable the COPY
under the hood and always fallback to INSERT or enable the COPY to use
when the *numSlots is 1, so in case of an EXPLAIN(ANALYZE) output we can
show the Remote SQL correctly. Is that make sense?

I'm still not sure if the trigger to use the COPY command for batch
insert should be *numSlots > 1 or something else. I'm open for better
ideas.

Thoughts?

--
Matheus Alcantara

From 0175f829cc2944eb596f18d701b64c2d90d8e2bb Mon Sep 17 00:00:00 2001
From: Matheus Alcantara <[email protected]>
Date: Fri, 10 Oct 2025 16:07:08 -0300
Subject: [PATCH v3] postgres_fdw: Use COPY to speed up batch inserts

---
 contrib/postgres_fdw/deparse.c      |  32 ++++++
 contrib/postgres_fdw/postgres_fdw.c | 159 +++++++++++++++++++++++++++-
 contrib/postgres_fdw/postgres_fdw.h |   1 +
 3 files changed, 190 insertions(+), 2 deletions(-)

diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c
index f2fb0051843..afd1cc636d7 100644
--- a/contrib/postgres_fdw/deparse.c
+++ b/contrib/postgres_fdw/deparse.c
@@ -2236,6 +2236,38 @@ rebuildInsertSql(StringInfo buf, Relation rel,
        appendStringInfoString(buf, orig_query + values_end_len);
 }
 
+/*
+ *  Build a COPY FROM STDIN statement using the TEXT format
+ */
+void
+buildCopySql(StringInfo buf, Relation rel, List *target_attrs)
+{
+       ListCell   *lc;
+       TupleDesc       tupdesc = RelationGetDescr(rel);
+       bool            first = true;
+
+       appendStringInfo(buf, "COPY ");
+       deparseRelation(buf, rel);
+       appendStringInfo(buf, "(");
+
+       foreach(lc, target_attrs)
+       {
+               int                     attnum = lfirst_int(lc);
+               Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
+
+               if (attr->attgenerated)
+                       continue;
+
+               if (!first)
+                       appendStringInfoString(buf, ", ");
+
+               first = false;
+
+               appendStringInfoString(buf, 
quote_identifier(NameStr(attr->attname)));
+       }
+       appendStringInfoString(buf, ") FROM STDIN");
+}
+
 /*
  * deparse remote UPDATE statement
  *
diff --git a/contrib/postgres_fdw/postgres_fdw.c 
b/contrib/postgres_fdw/postgres_fdw.c
index 456b267f70b..0ccbff8e390 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -63,6 +63,9 @@ PG_MODULE_MAGIC_EXT(
 /* If no remote estimates, assume a sort costs 20% extra */
 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
 
+/* Buffer size to send COPY IN data*/
+#define COPYBUFSIZ 8192
+
 /*
  * Indexes of FDW-private information stored in fdw_private lists.
  *
@@ -192,6 +195,7 @@ typedef struct PgFdwModifyState
        /* extracted fdw_private data */
        char       *query;                      /* text of INSERT/UPDATE/DELETE 
command */
        char       *orig_query;         /* original text of INSERT command */
+       char       *copy_query;         /* text of COPY command if it's being 
used */
        List       *target_attrs;       /* list of target attribute numbers */
        int                     values_end;             /* length up to the end 
of VALUES */
        int                     batch_size;             /* value of FDW option 
"batch_size" */
@@ -545,6 +549,9 @@ static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
                                                          const 
PgFdwRelationInfo *fpinfo_o,
                                                          const 
PgFdwRelationInfo *fpinfo_i);
 static int     get_batch_size_option(Relation rel);
+static TupleTableSlot **execute_foreign_insert_using_copy(PgFdwModifyState 
*fmstate,
+                                                                               
                                  TupleTableSlot **slots,
+                                                                               
                                  int *numSlots);
 
 
 /*
@@ -2942,8 +2949,23 @@ postgresExplainForeignModify(ModifyTableState *mtstate,
 {
        if (es->verbose)
        {
-               char       *sql = strVal(list_nth(fdw_private,
-                                                                               
  FdwModifyPrivateUpdateSql));
+               char       *sql = NULL;
+
+               /*
+                * We only have ri_FdwState during EXPLAIN(ANALYZE), so check 
if the
+                * COPY was used during query execution and show it as a Remote 
SQL.
+                */
+               if (rinfo->ri_FdwState != NULL)
+               {
+                       PgFdwModifyState *fmstate = (PgFdwModifyState *) 
rinfo->ri_FdwState;
+
+                       if (fmstate->copy_query != NULL)
+                               sql = fmstate->copy_query;
+               }
+
+               if (sql == NULL)
+                       sql = strVal(list_nth(fdw_private,
+                                                                 
FdwModifyPrivateUpdateSql));
 
                ExplainPropertyText("Remote SQL", sql, es);
 
@@ -4066,6 +4088,50 @@ create_foreign_modify(EState *estate,
        return fmstate;
 }
 
+/*
+ *  Write target attribute values from fmstate into buf buffer to be sent as
+ *  COPY FROM STDIN data
+ */
+static void
+convert_slot_to_copy_text(StringInfo buf,
+                                                 PgFdwModifyState *fmstate,
+                                                 TupleTableSlot *slot)
+{
+       ListCell   *lc;
+       TupleDesc       tupdesc = RelationGetDescr(fmstate->rel);
+       bool            first = true;
+
+       foreach(lc, fmstate->target_attrs)
+       {
+               int                     attnum = lfirst_int(lc);
+               CompactAttribute *attr = TupleDescCompactAttr(tupdesc, attnum - 
1);
+               Datum           datum;
+               bool            isnull;
+
+               /* Ignore generated columns; they are set to DEFAULT */
+               if (attr->attgenerated)
+                       continue;
+
+               if (!first)
+                       appendStringInfoCharMacro(buf, '\t');
+               first = false;
+
+               datum = slot_getattr(slot, attnum, &isnull);
+
+               if (isnull)
+                       appendStringInfoString(buf, "\\N");
+               else
+               {
+                       const char *value = 
OutputFunctionCall(&fmstate->p_flinfo[attnum - 1],
+                                                                               
                   datum);
+
+                       appendStringInfoString(buf, value);
+               }
+       }
+
+       appendStringInfoCharMacro(buf, '\n');
+}
+
 /*
  * execute_foreign_modify
  *             Perform foreign-table modification as required, and fetch 
RETURNING
@@ -4097,6 +4163,13 @@ execute_foreign_modify(EState *estate,
        if (fmstate->conn_state->pendingAreq)
                process_pending_request(fmstate->conn_state->pendingAreq);
 
+       /*
+        * Use COPY command for batch insert if the original query don't 
include a
+        * RETURNING clause
+        */
+       if (operation == CMD_INSERT && *numSlots > 1 && !fmstate->has_returning)
+               return execute_foreign_insert_using_copy(fmstate, slots, 
numSlots);
+
        /*
         * If the existing query was deparsed and prepared for a different 
number
         * of rows, rebuild it for the proper number.
@@ -7886,3 +7959,85 @@ get_batch_size_option(Relation rel)
 
        return batch_size;
 }
+
+/*  Execute a batch insert into a foreign table using the COPY command */
+static TupleTableSlot **
+execute_foreign_insert_using_copy(PgFdwModifyState *fmstate,
+                                                                 
TupleTableSlot **slots,
+                                                                 int *numSlots)
+{
+       PGresult   *res;
+       StringInfoData sql;
+       StringInfoData copy_data;
+       int                     n_rows;
+       int                     i;
+
+       if (fmstate->copy_query == NULL)
+       {
+               /* Build COPY command */
+               initStringInfo(&sql);
+               buildCopySql(&sql, fmstate->rel, fmstate->target_attrs);
+
+               /* Cache for reuse. */
+               fmstate->copy_query = sql.data;
+       }
+
+       /* Send COPY command */
+       if (!PQsendQuery(fmstate->conn, fmstate->copy_query))
+               pgfdw_report_error(NULL, fmstate->conn, fmstate->copy_query);
+
+       /* get the COPY result */
+       res = pgfdw_get_result(fmstate->conn);
+       if (PQresultStatus(res) != PGRES_COPY_IN)
+               pgfdw_report_error(res, fmstate->conn, fmstate->copy_query);
+
+       /* Convert the TupleTableSlot data into a TEXT-formatted line */
+       initStringInfo(&copy_data);
+       for (i = 0; i < *numSlots; i++)
+       {
+               convert_slot_to_copy_text(&copy_data, fmstate, slots[i]);
+
+               /*
+                * Send initial COPY data if the buffer reach the limit to 
avoid large
+                * memory usage.
+                */
+               if (copy_data.len >= COPYBUFSIZ)
+               {
+                       if (PQputCopyData(fmstate->conn, copy_data.data, 
copy_data.len) <= 0)
+                               pgfdw_report_error(NULL, fmstate->conn, 
fmstate->copy_query);
+                       resetStringInfo(&copy_data);
+               }
+       }
+
+       /* Send the remaining COPY data */
+       if (copy_data.len > 0)
+       {
+               if (PQputCopyData(fmstate->conn, copy_data.data, copy_data.len) 
<= 0)
+                       pgfdw_report_error(NULL, fmstate->conn, 
fmstate->copy_query);
+       }
+
+       /* End the COPY operation */
+       if (PQputCopyEnd(fmstate->conn, NULL) < 0 || PQflush(fmstate->conn))
+               pgfdw_report_error(NULL, fmstate->conn, fmstate->copy_query);
+
+       /*
+        * Get the result, and check for success.
+        */
+       res = pgfdw_get_result(fmstate->conn);
+       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+               pgfdw_report_error(res, fmstate->conn, fmstate->copy_query);
+
+       n_rows = atoi(PQcmdTuples(res));
+
+       /* And clean up */
+       PQclear(res);
+
+       MemoryContextReset(fmstate->temp_cxt);
+
+       *numSlots = n_rows;
+
+       /*
+        * Return NULL if nothing was inserted on the remote end
+        */
+       return (n_rows > 0) ? slots : NULL;
+}
diff --git a/contrib/postgres_fdw/postgres_fdw.h 
b/contrib/postgres_fdw/postgres_fdw.h
index e69735298d7..c0198b865f3 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -204,6 +204,7 @@ extern void rebuildInsertSql(StringInfo buf, Relation rel,
                                                         char *orig_query, List 
*target_attrs,
                                                         int values_end_len, 
int num_params,
                                                         int num_rows);
+extern void buildCopySql(StringInfo buf, Relation rel, List *target_attrs);
 extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte,
                                                         Index rtindex, 
Relation rel,
                                                         List *targetAttrs,
-- 
2.51.0

Reply via email to