On Tue Oct 21, 2025 at 11:25 AM -03, Matheus Alcantara wrote: >>> Lastly, I don't know if we should change the EXPLAIN(ANALYZE, VERBOSE) >>> output for batch inserts that use the COPY to mention that we are >>> sending the COPY command to the remote server. I guess so? >>> >> >> Good point. We definitely should not show SQL for INSERT, when we're >> actually running a COPY. >> > This seems a bit tricky to implement. The COPY is used based on the > number of slots into the TupleTableSlot array that is used for batch > insert. The numSlots that execute_foreign_modify() receive is coming > from ResultRelInfo->ri_NumSlots during ExecInsert(). We don't have this > information during EXPLAIN that is handled by > postgresExplainForeignModify(), we only have the > ResultRelInfo->ri_BatchSize at this stage. The current idea is to use > the COPY command if the number of slots is > 1 so I'm wondering if we > should use another mechanism to enable the COPY usage, for example, we > could just use if the batch_size is configured to a number greater than > X, but what if the INSERT statement is only inserting a single row, > should we still use the COPY command to ingest a single row into the > foreign table? Any thoughts? > Thinking more about this I realize that when we are deparsing the remote SQL to be sent to the foreign server at the planner phase (via postgresPlanForeignModify()) we don't have the batch_size and number of rows information, so currently we can not know at the plan time if the COPY usage for a batch insert is visible or not because IIUC these information are only visible at query runtime.
One way to make it possible is that we could simply use the PgFdwModifyState->copy_data during postgresExplainForeignModify() if it's not null. Since we will only have this information during query execution the drawback of this approach is that we would only show the COPY as a Remote SQL on during EXPLAIN(ANALYZE). Please see the attached v3 version that implements this idea. > I tried to reuse the fmstate->query field to cache the COPY sql but > running the postgres_fdw.sql regress test shows that this may not > work. When we are running a user supplied COPY command on a foreign > table the CopyMultiInsertBufferFlush() call > ri_FdwRoutine->ExecForeignBatchInsert which may pass different values > for numSlots based on the number of slots already sent to the foreign > server, and eventually it may pass numSlots as 1 which will not use the > COPY under the hood to send to the foreign server and if we cache the > COPY command into the fmstate->query this will not work because the > normal INSERT path on execute_foreign_modify uses the fmstate->query to > build a prepared statement to send to the foreign server. So basically > what I'm trying to say is that when the server is executing a COPY into > a foreign it may use the COPY command or INSERT command to send the data > to the foreign server. That being said, I decided to create a new > copy_query field on PgFdwModifyState to cache only COPY commands. Please > let me know if my understanding is wrong or if we could have a better > approach here. > Based on the information that I've mention above I think that we need some way to not mix INSERT with COPY commands when executing a COPY in a foreign table supplied by the user. Or we should disable the COPY under the hood and always fallback to INSERT or enable the COPY to use when the *numSlots is 1, so in case of an EXPLAIN(ANALYZE) output we can show the Remote SQL correctly. Is that make sense? I'm still not sure if the trigger to use the COPY command for batch insert should be *numSlots > 1 or something else. I'm open for better ideas. Thoughts? -- Matheus Alcantara
From 0175f829cc2944eb596f18d701b64c2d90d8e2bb Mon Sep 17 00:00:00 2001 From: Matheus Alcantara <[email protected]> Date: Fri, 10 Oct 2025 16:07:08 -0300 Subject: [PATCH v3] postgres_fdw: Use COPY to speed up batch inserts --- contrib/postgres_fdw/deparse.c | 32 ++++++ contrib/postgres_fdw/postgres_fdw.c | 159 +++++++++++++++++++++++++++- contrib/postgres_fdw/postgres_fdw.h | 1 + 3 files changed, 190 insertions(+), 2 deletions(-) diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c index f2fb0051843..afd1cc636d7 100644 --- a/contrib/postgres_fdw/deparse.c +++ b/contrib/postgres_fdw/deparse.c @@ -2236,6 +2236,38 @@ rebuildInsertSql(StringInfo buf, Relation rel, appendStringInfoString(buf, orig_query + values_end_len); } +/* + * Build a COPY FROM STDIN statement using the TEXT format + */ +void +buildCopySql(StringInfo buf, Relation rel, List *target_attrs) +{ + ListCell *lc; + TupleDesc tupdesc = RelationGetDescr(rel); + bool first = true; + + appendStringInfo(buf, "COPY "); + deparseRelation(buf, rel); + appendStringInfo(buf, "("); + + foreach(lc, target_attrs) + { + int attnum = lfirst_int(lc); + Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); + + if (attr->attgenerated) + continue; + + if (!first) + appendStringInfoString(buf, ", "); + + first = false; + + appendStringInfoString(buf, quote_identifier(NameStr(attr->attname))); + } + appendStringInfoString(buf, ") FROM STDIN"); +} + /* * deparse remote UPDATE statement * diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 456b267f70b..0ccbff8e390 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -63,6 +63,9 @@ PG_MODULE_MAGIC_EXT( /* If no remote estimates, assume a sort costs 20% extra */ #define DEFAULT_FDW_SORT_MULTIPLIER 1.2 +/* Buffer size to send COPY IN data*/ +#define COPYBUFSIZ 8192 + /* * Indexes of FDW-private information stored in fdw_private lists. * @@ -192,6 +195,7 @@ typedef struct PgFdwModifyState /* extracted fdw_private data */ char *query; /* text of INSERT/UPDATE/DELETE command */ char *orig_query; /* original text of INSERT command */ + char *copy_query; /* text of COPY command if it's being used */ List *target_attrs; /* list of target attribute numbers */ int values_end; /* length up to the end of VALUES */ int batch_size; /* value of FDW option "batch_size" */ @@ -545,6 +549,9 @@ static void merge_fdw_options(PgFdwRelationInfo *fpinfo, const PgFdwRelationInfo *fpinfo_o, const PgFdwRelationInfo *fpinfo_i); static int get_batch_size_option(Relation rel); +static TupleTableSlot **execute_foreign_insert_using_copy(PgFdwModifyState *fmstate, + TupleTableSlot **slots, + int *numSlots); /* @@ -2942,8 +2949,23 @@ postgresExplainForeignModify(ModifyTableState *mtstate, { if (es->verbose) { - char *sql = strVal(list_nth(fdw_private, - FdwModifyPrivateUpdateSql)); + char *sql = NULL; + + /* + * We only have ri_FdwState during EXPLAIN(ANALYZE), so check if the + * COPY was used during query execution and show it as a Remote SQL. + */ + if (rinfo->ri_FdwState != NULL) + { + PgFdwModifyState *fmstate = (PgFdwModifyState *) rinfo->ri_FdwState; + + if (fmstate->copy_query != NULL) + sql = fmstate->copy_query; + } + + if (sql == NULL) + sql = strVal(list_nth(fdw_private, + FdwModifyPrivateUpdateSql)); ExplainPropertyText("Remote SQL", sql, es); @@ -4066,6 +4088,50 @@ create_foreign_modify(EState *estate, return fmstate; } +/* + * Write target attribute values from fmstate into buf buffer to be sent as + * COPY FROM STDIN data + */ +static void +convert_slot_to_copy_text(StringInfo buf, + PgFdwModifyState *fmstate, + TupleTableSlot *slot) +{ + ListCell *lc; + TupleDesc tupdesc = RelationGetDescr(fmstate->rel); + bool first = true; + + foreach(lc, fmstate->target_attrs) + { + int attnum = lfirst_int(lc); + CompactAttribute *attr = TupleDescCompactAttr(tupdesc, attnum - 1); + Datum datum; + bool isnull; + + /* Ignore generated columns; they are set to DEFAULT */ + if (attr->attgenerated) + continue; + + if (!first) + appendStringInfoCharMacro(buf, '\t'); + first = false; + + datum = slot_getattr(slot, attnum, &isnull); + + if (isnull) + appendStringInfoString(buf, "\\N"); + else + { + const char *value = OutputFunctionCall(&fmstate->p_flinfo[attnum - 1], + datum); + + appendStringInfoString(buf, value); + } + } + + appendStringInfoCharMacro(buf, '\n'); +} + /* * execute_foreign_modify * Perform foreign-table modification as required, and fetch RETURNING @@ -4097,6 +4163,13 @@ execute_foreign_modify(EState *estate, if (fmstate->conn_state->pendingAreq) process_pending_request(fmstate->conn_state->pendingAreq); + /* + * Use COPY command for batch insert if the original query don't include a + * RETURNING clause + */ + if (operation == CMD_INSERT && *numSlots > 1 && !fmstate->has_returning) + return execute_foreign_insert_using_copy(fmstate, slots, numSlots); + /* * If the existing query was deparsed and prepared for a different number * of rows, rebuild it for the proper number. @@ -7886,3 +7959,85 @@ get_batch_size_option(Relation rel) return batch_size; } + +/* Execute a batch insert into a foreign table using the COPY command */ +static TupleTableSlot ** +execute_foreign_insert_using_copy(PgFdwModifyState *fmstate, + TupleTableSlot **slots, + int *numSlots) +{ + PGresult *res; + StringInfoData sql; + StringInfoData copy_data; + int n_rows; + int i; + + if (fmstate->copy_query == NULL) + { + /* Build COPY command */ + initStringInfo(&sql); + buildCopySql(&sql, fmstate->rel, fmstate->target_attrs); + + /* Cache for reuse. */ + fmstate->copy_query = sql.data; + } + + /* Send COPY command */ + if (!PQsendQuery(fmstate->conn, fmstate->copy_query)) + pgfdw_report_error(NULL, fmstate->conn, fmstate->copy_query); + + /* get the COPY result */ + res = pgfdw_get_result(fmstate->conn); + if (PQresultStatus(res) != PGRES_COPY_IN) + pgfdw_report_error(res, fmstate->conn, fmstate->copy_query); + + /* Convert the TupleTableSlot data into a TEXT-formatted line */ + initStringInfo(©_data); + for (i = 0; i < *numSlots; i++) + { + convert_slot_to_copy_text(©_data, fmstate, slots[i]); + + /* + * Send initial COPY data if the buffer reach the limit to avoid large + * memory usage. + */ + if (copy_data.len >= COPYBUFSIZ) + { + if (PQputCopyData(fmstate->conn, copy_data.data, copy_data.len) <= 0) + pgfdw_report_error(NULL, fmstate->conn, fmstate->copy_query); + resetStringInfo(©_data); + } + } + + /* Send the remaining COPY data */ + if (copy_data.len > 0) + { + if (PQputCopyData(fmstate->conn, copy_data.data, copy_data.len) <= 0) + pgfdw_report_error(NULL, fmstate->conn, fmstate->copy_query); + } + + /* End the COPY operation */ + if (PQputCopyEnd(fmstate->conn, NULL) < 0 || PQflush(fmstate->conn)) + pgfdw_report_error(NULL, fmstate->conn, fmstate->copy_query); + + /* + * Get the result, and check for success. + */ + res = pgfdw_get_result(fmstate->conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(res, fmstate->conn, fmstate->copy_query); + + n_rows = atoi(PQcmdTuples(res)); + + /* And clean up */ + PQclear(res); + + MemoryContextReset(fmstate->temp_cxt); + + *numSlots = n_rows; + + /* + * Return NULL if nothing was inserted on the remote end + */ + return (n_rows > 0) ? slots : NULL; +} diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index e69735298d7..c0198b865f3 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -204,6 +204,7 @@ extern void rebuildInsertSql(StringInfo buf, Relation rel, char *orig_query, List *target_attrs, int values_end_len, int num_params, int num_rows); +extern void buildCopySql(StringInfo buf, Relation rel, List *target_attrs); extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte, Index rtindex, Relation rel, List *targetAttrs, -- 2.51.0
