On 1/13/21 10:15 AM, Amit Langote wrote:
> Hi Tomas, Tsunakawa-san,
>
> Thanks for your work on this.
>
> On Tue, Jan 12, 2021 at 11:06 AM Tomas Vondra
> <[email protected]> wrote:
>> AFAICS the discussions about making this use COPY and/or libpq
>> pipelining (neither of which is committed yet) ended with the conclusion
>> that those changes are somewhat independent, and that it's worth getting
>> this committed in the current form. Barring objections, I'll push this
>> within the next couple days.
>
> I was trying this out today (been meaning to do so for a while) and
> noticed that this fails when there are AFTER ROW triggers on the
> foreign table. Here's an example:
>
> create extension postgres_fdw ;
> create server lb foreign data wrapper postgres_fdw ;
> create user mapping for current_user server lb;
> create table p (a numeric primary key);
> create foreign table fp (a int) server lb options (table_name 'p');
> create function print_row () returns trigger as $$ begin raise notice
> '%', new; return null; end; $$ language plpgsql;
> create trigger after_insert_trig after insert on fp for each row
> execute function print_row();
> insert into fp select generate_series (1, 10);
> <crashes>
>
> Apparently, the new code seems to assume that batching wouldn't be
> active when the original query contains RETURNING clause but some
> parts fail to account for the case where RETURNING is added to the
> query to retrieve the tuple to pass to the AFTER TRIGGER.
> Specifically, the Assert in the following block in
> execute_foreign_modify() is problematic:
>
> /* Check number of rows affected, and fetch RETURNING tuple if any */
> if (fmstate->has_returning)
> {
> Assert(*numSlots == 1);
> n_rows = PQntuples(res);
> if (n_rows > 0)
> store_returning_result(fmstate, slots[0], res);
> }
>
Thanks for the report. Yeah, I think there's a missing check in
ExecInsert. Adding
(!resultRelInfo->ri_TrigDesc->trig_insert_after_row)
solves this. But now I'm wondering if this is the wrong place to make
this decision. I mean, why should we make the decision here, when the
decision whether to have a RETURNING clause is made in postgres_fdw in
deparseReturningList? We don't really know what the other FDWs will do,
for example.
So I think we should just move all of this into GetModifyBatchSize. We
can start with ri_BatchSize = 0. And then do
if (resultRelInfo->ri_BatchSize == 0)
resultRelInfo->ri_BatchSize =
resultRelInfo->ri_FdwRoutine->GetModifyBatchSize(resultRelInfo);
if (resultRelInfo->ri_BatchSize > 1)
{
... do batching ...
}
The GetModifyBatchSize would always return value > 0, so either 1 (no
batching) or >1 (batching).
regards
--
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c
index 3cf7b4eb1e..2d38ab25cb 100644
--- a/contrib/postgres_fdw/deparse.c
+++ b/contrib/postgres_fdw/deparse.c
@@ -1711,7 +1711,7 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
Index rtindex, Relation rel,
List *targetAttrs, bool doNothing,
List *withCheckOptionList, List *returningList,
- List **retrieved_attrs)
+ List **retrieved_attrs, int *values_end_len)
{
AttrNumber pindex;
bool first;
@@ -1754,6 +1754,7 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
}
else
appendStringInfoString(buf, " DEFAULT VALUES");
+ *values_end_len = buf->len;
if (doNothing)
appendStringInfoString(buf, " ON CONFLICT DO NOTHING");
@@ -1763,6 +1764,46 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
withCheckOptionList, returningList, retrieved_attrs);
}
+/*
+ * rebuild remote INSERT statement
+ *
+ */
+void
+rebuildInsertSql(StringInfo buf, char *orig_query,
+ int values_end_len, int num_cols,
+ int num_rows)
+{
+ int i, j;
+ int pindex;
+ bool first;
+
+ /* Copy up to the end of the first record from the original query */
+ appendBinaryStringInfo(buf, orig_query, values_end_len);
+
+ /* Add records to VALUES clause */
+ pindex = num_cols + 1;
+ for (i = 0; i < num_rows; i++)
+ {
+ appendStringInfoString(buf, ", (");
+
+ first = true;
+ for (j = 0; j < num_cols; j++)
+ {
+ if (!first)
+ appendStringInfoString(buf, ", ");
+ first = false;
+
+ appendStringInfo(buf, "$%d", pindex);
+ pindex++;
+ }
+
+ appendStringInfoChar(buf, ')');
+ }
+
+ /* Copy stuff after VALUES clause from the original query */
+ appendStringInfoString(buf, orig_query + values_end_len);
+}
+
/*
* deparse remote UPDATE statement
*
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index c11092f8cc..96bad17ded 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -8911,7 +8911,7 @@ DO $d$
END;
$d$;
ERROR: invalid option "password"
-HINT: Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, fetch_size
+HINT: Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, fetch_size, batch_size
CONTEXT: SQL statement "ALTER SERVER loopback_nopw OPTIONS (ADD password 'dummypw')"
PL/pgSQL function inline_code_block line 3 at EXECUTE
-- If we add a password for our user mapping instead, we should get a different
@@ -9053,3 +9053,117 @@ SELECT 1 FROM ft1 LIMIT 1;
ALTER SERVER loopback OPTIONS (ADD use_remote_estimate 'off');
-- The invalid connection gets closed in pgfdw_xact_callback during commit.
COMMIT;
+-- ===================================================================
+-- batch insert
+-- ===================================================================
+BEGIN;
+CREATE SERVER batch10 FOREIGN DATA WRAPPER postgres_fdw OPTIONS( batch_size '10' );
+SELECT count(*)
+FROM pg_foreign_server
+WHERE srvname = 'batch10'
+AND srvoptions @> array['batch_size=10'];
+ count
+-------
+ 1
+(1 row)
+
+ALTER SERVER batch10 OPTIONS( SET batch_size '20' );
+SELECT count(*)
+FROM pg_foreign_server
+WHERE srvname = 'batch10'
+AND srvoptions @> array['batch_size=10'];
+ count
+-------
+ 0
+(1 row)
+
+SELECT count(*)
+FROM pg_foreign_server
+WHERE srvname = 'batch10'
+AND srvoptions @> array['batch_size=20'];
+ count
+-------
+ 1
+(1 row)
+
+CREATE FOREIGN TABLE table30 ( x int ) SERVER batch10 OPTIONS ( batch_size '30' );
+SELECT COUNT(*)
+FROM pg_foreign_table
+WHERE ftrelid = 'table30'::regclass
+AND ftoptions @> array['batch_size=30'];
+ count
+-------
+ 1
+(1 row)
+
+ALTER FOREIGN TABLE table30 OPTIONS ( SET batch_size '40');
+SELECT COUNT(*)
+FROM pg_foreign_table
+WHERE ftrelid = 'table30'::regclass
+AND ftoptions @> array['batch_size=30'];
+ count
+-------
+ 0
+(1 row)
+
+SELECT COUNT(*)
+FROM pg_foreign_table
+WHERE ftrelid = 'table30'::regclass
+AND ftoptions @> array['batch_size=40'];
+ count
+-------
+ 1
+(1 row)
+
+ROLLBACK;
+CREATE TABLE batch_table ( x int );
+CREATE FOREIGN TABLE ftable ( x int ) SERVER loopback OPTIONS ( table_name 'batch_table', batch_size '10' );
+INSERT INTO ftable SELECT * FROM generate_series(1, 10) i;
+INSERT INTO ftable SELECT * FROM generate_series(11, 31) i;
+INSERT INTO ftable VALUES (32);
+INSERT INTO ftable VALUES (33), (34);
+SELECT COUNT(*) FROM ftable;
+ count
+-------
+ 34
+(1 row)
+
+TRUNCATE batch_table;
+DROP FOREIGN TABLE ftable;
+-- Disable batch insert
+CREATE FOREIGN TABLE ftable ( x int ) SERVER loopback OPTIONS ( table_name 'batch_table', batch_size '1' );
+INSERT INTO ftable VALUES (1), (2);
+SELECT COUNT(*) FROM ftable;
+ count
+-------
+ 2
+(1 row)
+
+DROP FOREIGN TABLE ftable;
+DROP TABLE batch_table;
+-- Use partitioning
+CREATE TABLE batch_table ( x int ) PARTITION BY HASH (x);
+CREATE TABLE batch_table_p0 (LIKE batch_table);
+CREATE FOREIGN TABLE batch_table_p0f
+ PARTITION OF batch_table
+ FOR VALUES WITH (MODULUS 3, REMAINDER 0)
+ SERVER loopback
+ OPTIONS (table_name 'batch_table_p0', batch_size '10');
+CREATE TABLE batch_table_p1 (LIKE batch_table);
+CREATE FOREIGN TABLE batch_table_p1f
+ PARTITION OF batch_table
+ FOR VALUES WITH (MODULUS 3, REMAINDER 1)
+ SERVER loopback
+ OPTIONS (table_name 'batch_table_p1', batch_size '1');
+CREATE TABLE batch_table_p2
+ PARTITION OF batch_table
+ FOR VALUES WITH (MODULUS 3, REMAINDER 2);
+INSERT INTO batch_table SELECT * FROM generate_series(1, 66) i;
+SELECT COUNT(*) FROM batch_table;
+ count
+-------
+ 66
+(1 row)
+
+-- Clean up
+DROP TABLE batch_table CASCADE;
diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c
index 1fec3c3eea..64698c4da3 100644
--- a/contrib/postgres_fdw/option.c
+++ b/contrib/postgres_fdw/option.c
@@ -142,6 +142,17 @@ postgres_fdw_validator(PG_FUNCTION_ARGS)
errmsg("%s requires a non-negative integer value",
def->defname)));
}
+ else if (strcmp(def->defname, "batch_size") == 0)
+ {
+ int batch_size;
+
+ batch_size = strtol(defGetString(def), NULL, 10);
+ if (batch_size <= 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("%s requires a non-negative integer value",
+ def->defname)));
+ }
else if (strcmp(def->defname, "password_required") == 0)
{
bool pw_required = defGetBoolean(def);
@@ -203,6 +214,9 @@ InitPgFdwOptions(void)
/* fetch_size is available on both server and table */
{"fetch_size", ForeignServerRelationId, false},
{"fetch_size", ForeignTableRelationId, false},
+ /* batch_size is available on both server and table */
+ {"batch_size", ForeignServerRelationId, false},
+ {"batch_size", ForeignTableRelationId, false},
{"password_required", UserMappingRelationId, false},
/*
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 2f2d4d171c..e6b1403ff1 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -87,8 +87,10 @@ enum FdwScanPrivateIndex
* 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server
* 2) Integer list of target attribute numbers for INSERT/UPDATE
* (NIL for a DELETE)
- * 3) Boolean flag showing if the remote query has a RETURNING clause
- * 4) Integer list of attribute numbers retrieved by RETURNING, if any
+ * 3) Length till the end of VALUES clause for INSERT
+ * (-1 for a DELETE/UPDATE)
+ * 4) Boolean flag showing if the remote query has a RETURNING clause
+ * 5) Integer list of attribute numbers retrieved by RETURNING, if any
*/
enum FdwModifyPrivateIndex
{
@@ -96,6 +98,8 @@ enum FdwModifyPrivateIndex
FdwModifyPrivateUpdateSql,
/* Integer list of target attribute numbers for INSERT/UPDATE */
FdwModifyPrivateTargetAttnums,
+ /* Length till the end of VALUES clause (as an integer Value node) */
+ FdwModifyPrivateLen,
/* has-returning flag (as an integer Value node) */
FdwModifyPrivateHasReturning,
/* Integer list of attribute numbers retrieved by RETURNING */
@@ -176,7 +180,10 @@ typedef struct PgFdwModifyState
/* extracted fdw_private data */
char *query; /* text of INSERT/UPDATE/DELETE command */
+ char *orig_query; /* original text of INSERT command */
List *target_attrs; /* list of target attribute numbers */
+ int values_end; /* length up to the end of VALUES */
+ int batch_size; /* value of FDW option "batch_size" */
bool has_returning; /* is there a RETURNING clause? */
List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
@@ -185,6 +192,9 @@ typedef struct PgFdwModifyState
int p_nums; /* number of parameters to transmit */
FmgrInfo *p_flinfo; /* output conversion functions for them */
+ /* batch operation stuff */
+ int num_slots; /* number of slots to insert */
+
/* working memory context */
MemoryContext temp_cxt; /* context for per-tuple temporary data */
@@ -343,6 +353,12 @@ static TupleTableSlot *postgresExecForeignInsert(EState *estate,
ResultRelInfo *resultRelInfo,
TupleTableSlot *slot,
TupleTableSlot *planSlot);
+static TupleTableSlot **postgresExecForeignBatchInsert(EState *estate,
+ ResultRelInfo *resultRelInfo,
+ TupleTableSlot **slots,
+ TupleTableSlot **planSlots,
+ int *numSlots);
+static int postgresGetModifyBatchSize(ResultRelInfo *resultRelInfo);
static TupleTableSlot *postgresExecForeignUpdate(EState *estate,
ResultRelInfo *resultRelInfo,
TupleTableSlot *slot,
@@ -429,20 +445,24 @@ static PgFdwModifyState *create_foreign_modify(EState *estate,
Plan *subplan,
char *query,
List *target_attrs,
+ int len,
bool has_returning,
List *retrieved_attrs);
-static TupleTableSlot *execute_foreign_modify(EState *estate,
+static TupleTableSlot **execute_foreign_modify(EState *estate,
ResultRelInfo *resultRelInfo,
CmdType operation,
- TupleTableSlot *slot,
- TupleTableSlot *planSlot);
+ TupleTableSlot **slots,
+ TupleTableSlot **planSlots,
+ int *numSlots);
static void prepare_foreign_modify(PgFdwModifyState *fmstate);
static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
ItemPointer tupleid,
- TupleTableSlot *slot);
+ TupleTableSlot **slots,
+ int numSlots);
static void store_returning_result(PgFdwModifyState *fmstate,
TupleTableSlot *slot, PGresult *res);
static void finish_foreign_modify(PgFdwModifyState *fmstate);
+static void deallocate_query(PgFdwModifyState *fmstate);
static List *build_remote_returning(Index rtindex, Relation rel,
List *returningList);
static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist);
@@ -530,6 +550,8 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
routine->PlanForeignModify = postgresPlanForeignModify;
routine->BeginForeignModify = postgresBeginForeignModify;
routine->ExecForeignInsert = postgresExecForeignInsert;
+ routine->ExecForeignBatchInsert = postgresExecForeignBatchInsert;
+ routine->GetModifyBatchSize = postgresGetModifyBatchSize;
routine->ExecForeignUpdate = postgresExecForeignUpdate;
routine->ExecForeignDelete = postgresExecForeignDelete;
routine->EndForeignModify = postgresEndForeignModify;
@@ -1665,6 +1687,7 @@ postgresPlanForeignModify(PlannerInfo *root,
List *returningList = NIL;
List *retrieved_attrs = NIL;
bool doNothing = false;
+ int values_end_len = -1;
initStringInfo(&sql);
@@ -1752,7 +1775,7 @@ postgresPlanForeignModify(PlannerInfo *root,
deparseInsertSql(&sql, rte, resultRelation, rel,
targetAttrs, doNothing,
withCheckOptionList, returningList,
- &retrieved_attrs);
+ &retrieved_attrs, &values_end_len);
break;
case CMD_UPDATE:
deparseUpdateSql(&sql, rte, resultRelation, rel,
@@ -1776,8 +1799,9 @@ postgresPlanForeignModify(PlannerInfo *root,
* Build the fdw_private list that will be available to the executor.
* Items in the list must match enum FdwModifyPrivateIndex, above.
*/
- return list_make4(makeString(sql.data),
+ return list_make5(makeString(sql.data),
targetAttrs,
+ makeInteger(values_end_len),
makeInteger((retrieved_attrs != NIL)),
retrieved_attrs);
}
@@ -1797,6 +1821,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
char *query;
List *target_attrs;
bool has_returning;
+ int values_end_len;
List *retrieved_attrs;
RangeTblEntry *rte;
@@ -1812,6 +1837,8 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
FdwModifyPrivateUpdateSql));
target_attrs = (List *) list_nth(fdw_private,
FdwModifyPrivateTargetAttnums);
+ values_end_len = intVal(list_nth(fdw_private,
+ FdwModifyPrivateLen));
has_returning = intVal(list_nth(fdw_private,
FdwModifyPrivateHasReturning));
retrieved_attrs = (List *) list_nth(fdw_private,
@@ -1829,6 +1856,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
mtstate->mt_plans[subplan_index]->plan,
query,
target_attrs,
+ values_end_len,
has_returning,
retrieved_attrs);
@@ -1846,7 +1874,8 @@ postgresExecForeignInsert(EState *estate,
TupleTableSlot *planSlot)
{
PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
- TupleTableSlot *rslot;
+ TupleTableSlot **rslot;
+ int numSlots = 1;
/*
* If the fmstate has aux_fmstate set, use the aux_fmstate (see
@@ -1855,7 +1884,36 @@ postgresExecForeignInsert(EState *estate,
if (fmstate->aux_fmstate)
resultRelInfo->ri_FdwState = fmstate->aux_fmstate;
rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT,
- slot, planSlot);
+ &slot, &planSlot, &numSlots);
+ /* Revert that change */
+ if (fmstate->aux_fmstate)
+ resultRelInfo->ri_FdwState = fmstate;
+
+ return rslot ? *rslot : NULL;
+}
+
+/*
+ * postgresExecForeignBatchInsert
+ * Insert multiple rows into a foreign table
+ */
+static TupleTableSlot **
+postgresExecForeignBatchInsert(EState *estate,
+ ResultRelInfo *resultRelInfo,
+ TupleTableSlot **slots,
+ TupleTableSlot **planSlots,
+ int *numSlots)
+{
+ PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
+ TupleTableSlot **rslot;
+
+ /*
+ * If the fmstate has aux_fmstate set, use the aux_fmstate (see
+ * postgresBeginForeignInsert())
+ */
+ if (fmstate->aux_fmstate)
+ resultRelInfo->ri_FdwState = fmstate->aux_fmstate;
+ rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT,
+ slots, planSlots, numSlots);
/* Revert that change */
if (fmstate->aux_fmstate)
resultRelInfo->ri_FdwState = fmstate;
@@ -1863,6 +1921,16 @@ postgresExecForeignInsert(EState *estate,
return rslot;
}
+/*
+ * postgresGetModifyBatchSize
+ * Report the maximum number of tuples that can be inserted in bulk
+ */
+static int
+postgresGetModifyBatchSize(ResultRelInfo *resultRelInfo)
+{
+ return ((PgFdwModifyState *) resultRelInfo->ri_FdwState)->batch_size;
+}
+
/*
* postgresExecForeignUpdate
* Update one row in a foreign table
@@ -1873,8 +1941,13 @@ postgresExecForeignUpdate(EState *estate,
TupleTableSlot *slot,
TupleTableSlot *planSlot)
{
- return execute_foreign_modify(estate, resultRelInfo, CMD_UPDATE,
- slot, planSlot);
+ TupleTableSlot **rslot;
+ int numSlots = 1;
+
+ rslot = execute_foreign_modify(estate, resultRelInfo, CMD_UPDATE,
+ &slot, &planSlot, &numSlots);
+
+ return rslot ? rslot[0] : NULL;
}
/*
@@ -1887,8 +1960,13 @@ postgresExecForeignDelete(EState *estate,
TupleTableSlot *slot,
TupleTableSlot *planSlot)
{
- return execute_foreign_modify(estate, resultRelInfo, CMD_DELETE,
- slot, planSlot);
+ TupleTableSlot **rslot;
+ int numSlots = 1;
+
+ rslot = execute_foreign_modify(estate, resultRelInfo, CMD_DELETE,
+ &slot, &planSlot, &numSlots);
+
+ return rslot ? rslot[0] : NULL;
}
/*
@@ -1925,6 +2003,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate,
RangeTblEntry *rte;
TupleDesc tupdesc = RelationGetDescr(rel);
int attnum;
+ int values_end_len;
StringInfoData sql;
List *targetAttrs = NIL;
List *retrieved_attrs = NIL;
@@ -2001,7 +2080,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate,
deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing,
resultRelInfo->ri_WithCheckOptions,
resultRelInfo->ri_returningList,
- &retrieved_attrs);
+ &retrieved_attrs, &values_end_len);
/* Construct an execution state. */
fmstate = create_foreign_modify(mtstate->ps.state,
@@ -2011,6 +2090,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate,
NULL,
sql.data,
targetAttrs,
+ values_end_len,
retrieved_attrs != NIL,
retrieved_attrs);
@@ -2636,6 +2716,9 @@ postgresExplainForeignModify(ModifyTableState *mtstate,
FdwModifyPrivateUpdateSql));
ExplainPropertyText("Remote SQL", sql, es);
+
+ if (rinfo->ri_BatchSize > 0)
+ ExplainPropertyInteger("Batch Size", NULL, rinfo->ri_BatchSize, es);
}
}
@@ -3530,6 +3613,7 @@ create_foreign_modify(EState *estate,
Plan *subplan,
char *query,
List *target_attrs,
+ int values_end,
bool has_returning,
List *retrieved_attrs)
{
@@ -3538,6 +3622,7 @@ create_foreign_modify(EState *estate,
TupleDesc tupdesc = RelationGetDescr(rel);
Oid userid;
ForeignTable *table;
+ ForeignServer *server;
UserMapping *user;
AttrNumber n_params;
Oid typefnoid;
@@ -3564,7 +3649,10 @@ create_foreign_modify(EState *estate,
/* Set up remote query information. */
fmstate->query = query;
+ if (operation == CMD_INSERT)
+ fmstate->orig_query = pstrdup(fmstate->query);
fmstate->target_attrs = target_attrs;
+ fmstate->values_end = values_end;
fmstate->has_returning = has_returning;
fmstate->retrieved_attrs = retrieved_attrs;
@@ -3616,6 +3704,44 @@ create_foreign_modify(EState *estate,
Assert(fmstate->p_nums <= n_params);
+ /* Set batch_size from foreign server/table options. */
+ if (operation == CMD_INSERT)
+ {
+ /* Check the foreign table option. */
+ foreach(lc, table->options)
+ {
+ DefElem *def = (DefElem *) lfirst(lc);
+
+ if (strcmp(def->defname, "batch_size") == 0)
+ {
+ fmstate->batch_size = strtol(defGetString(def), NULL, 10);
+ break;
+ }
+ }
+
+ /* Check the foreign server option if the table option is not set. */
+ if (fmstate->batch_size == 0)
+ {
+ server = GetForeignServer(table->serverid);
+ foreach(lc, server->options)
+ {
+ DefElem *def = (DefElem *) lfirst(lc);
+
+ if (strcmp(def->defname, "batch_size") == 0)
+ {
+ fmstate->batch_size = strtol(defGetString(def), NULL, 10);
+ break;
+ }
+ }
+ }
+
+ /* If neither the table nor server option is set, set the default. */
+ if (fmstate->batch_size == 0)
+ fmstate->batch_size = 100;
+ }
+
+ fmstate->num_slots = 1;
+
/* Initialize auxiliary state */
fmstate->aux_fmstate = NULL;
@@ -3626,26 +3752,50 @@ create_foreign_modify(EState *estate,
* execute_foreign_modify
* Perform foreign-table modification as required, and fetch RETURNING
* result if any. (This is the shared guts of postgresExecForeignInsert,
- * postgresExecForeignUpdate, and postgresExecForeignDelete.)
+ * postgresExecForeignBatchInsert, postgresExecForeignUpdate, and
+ * postgresExecForeignDelete.)
*/
-static TupleTableSlot *
+static TupleTableSlot **
execute_foreign_modify(EState *estate,
ResultRelInfo *resultRelInfo,
CmdType operation,
- TupleTableSlot *slot,
- TupleTableSlot *planSlot)
+ TupleTableSlot **slots,
+ TupleTableSlot **planSlots,
+ int *numSlots)
{
PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
ItemPointer ctid = NULL;
const char **p_values;
PGresult *res;
int n_rows;
+ StringInfoData sql;
/* The operation should be INSERT, UPDATE, or DELETE */
Assert(operation == CMD_INSERT ||
operation == CMD_UPDATE ||
operation == CMD_DELETE);
+ /*
+ * If the existing query was deparsed and prepared for a different number
+ * of rows, rebuild it for the proper number.
+ */
+ if (operation == CMD_INSERT && fmstate->num_slots != *numSlots)
+ {
+ /* Destroy the prepared statement created previously */
+ if (fmstate->p_name)
+ deallocate_query(fmstate);
+
+ /*
+ * Build INSERT string with numSlots records in its VALUES clause.
+ */
+ initStringInfo(&sql);
+ rebuildInsertSql(&sql, fmstate->orig_query, fmstate->values_end,
+ fmstate->p_nums, *numSlots - 1);
+ pfree(fmstate->query);
+ fmstate->query = sql.data;
+ fmstate->num_slots = *numSlots;
+ }
+
/* Set up the prepared statement on the remote server, if we didn't yet */
if (!fmstate->p_name)
prepare_foreign_modify(fmstate);
@@ -3658,7 +3808,7 @@ execute_foreign_modify(EState *estate,
Datum datum;
bool isNull;
- datum = ExecGetJunkAttribute(planSlot,
+ datum = ExecGetJunkAttribute(planSlots[0],
fmstate->ctidAttno,
&isNull);
/* shouldn't ever get a null result... */
@@ -3668,14 +3818,14 @@ execute_foreign_modify(EState *estate,
}
/* Convert parameters needed by prepared statement to text form */
- p_values = convert_prep_stmt_params(fmstate, ctid, slot);
+ p_values = convert_prep_stmt_params(fmstate, ctid, slots, *numSlots);
/*
* Execute the prepared statement.
*/
if (!PQsendQueryPrepared(fmstate->conn,
fmstate->p_name,
- fmstate->p_nums,
+ fmstate->p_nums * (*numSlots),
p_values,
NULL,
NULL,
@@ -3696,9 +3846,10 @@ execute_foreign_modify(EState *estate,
/* Check number of rows affected, and fetch RETURNING tuple if any */
if (fmstate->has_returning)
{
+ Assert(*numSlots == 1);
n_rows = PQntuples(res);
if (n_rows > 0)
- store_returning_result(fmstate, slot, res);
+ store_returning_result(fmstate, slots[0], res);
}
else
n_rows = atoi(PQcmdTuples(res));
@@ -3708,10 +3859,12 @@ execute_foreign_modify(EState *estate,
MemoryContextReset(fmstate->temp_cxt);
+ *numSlots = n_rows;
+
/*
* Return NULL if nothing was inserted/updated/deleted on the remote end
*/
- return (n_rows > 0) ? slot : NULL;
+ return (n_rows > 0) ? slots : NULL;
}
/*
@@ -3771,52 +3924,64 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
static const char **
convert_prep_stmt_params(PgFdwModifyState *fmstate,
ItemPointer tupleid,
- TupleTableSlot *slot)
+ TupleTableSlot **slots,
+ int numSlots)
{
const char **p_values;
+ int i;
+ int j;
int pindex = 0;
MemoryContext oldcontext;
oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
- p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
+ p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums * numSlots);
+
+ /* ctid is provided only for UPDATE/DELETE, which don't allow batching */
+ Assert(!(tupleid != NULL && numSlots > 1));
/* 1st parameter should be ctid, if it's in use */
if (tupleid != NULL)
{
+ Assert(numSlots == 1);
/* don't need set_transmission_modes for TID output */
p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
PointerGetDatum(tupleid));
pindex++;
}
- /* get following parameters from slot */
- if (slot != NULL && fmstate->target_attrs != NIL)
+ /* get following parameters from slots */
+ if (slots != NULL && fmstate->target_attrs != NIL)
{
int nestlevel;
ListCell *lc;
nestlevel = set_transmission_modes();
- foreach(lc, fmstate->target_attrs)
+ for (i = 0; i < numSlots; i++)
{
- int attnum = lfirst_int(lc);
- Datum value;
- bool isnull;
+ j = (tupleid != NULL) ? 1 : 0;
+ foreach(lc, fmstate->target_attrs)
+ {
+ int attnum = lfirst_int(lc);
+ Datum value;
+ bool isnull;
- value = slot_getattr(slot, attnum, &isnull);
- if (isnull)
- p_values[pindex] = NULL;
- else
- p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
- value);
- pindex++;
+ value = slot_getattr(slots[i], attnum, &isnull);
+ if (isnull)
+ p_values[pindex] = NULL;
+ else
+ p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[j],
+ value);
+ pindex++;
+ j++;
+ }
}
reset_transmission_modes(nestlevel);
}
- Assert(pindex == fmstate->p_nums);
+ Assert(pindex == fmstate->p_nums * numSlots);
MemoryContextSwitchTo(oldcontext);
@@ -3870,29 +4035,41 @@ finish_foreign_modify(PgFdwModifyState *fmstate)
Assert(fmstate != NULL);
/* If we created a prepared statement, destroy it */
- if (fmstate->p_name)
- {
- char sql[64];
- PGresult *res;
-
- snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
-
- /*
- * We don't use a PG_TRY block here, so be careful not to throw error
- * without releasing the PGresult.
- */
- res = pgfdw_exec_query(fmstate->conn, sql);
- if (PQresultStatus(res) != PGRES_COMMAND_OK)
- pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
- PQclear(res);
- fmstate->p_name = NULL;
- }
+ deallocate_query(fmstate);
/* Release remote connection */
ReleaseConnection(fmstate->conn);
fmstate->conn = NULL;
}
+/*
+ * deallocate_query
+ * Deallocate a prepared statement for a foreign insert/update/delete
+ * operation
+ */
+static void
+deallocate_query(PgFdwModifyState *fmstate)
+{
+ char sql[64];
+ PGresult *res;
+
+ /* do nothing if the query is not allocated */
+ if (!fmstate->p_name)
+ return;
+
+ snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
+
+ /*
+ * We don't use a PG_TRY block here, so be careful not to throw error
+ * without releasing the PGresult.
+ */
+ res = pgfdw_exec_query(fmstate->conn, sql);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
+ PQclear(res);
+ fmstate->p_name = NULL;
+}
+
/*
* build_remote_returning
* Build a RETURNING targetlist of a remote query for performing an
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 19ea27a1bc..1f67b4d9fd 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -161,7 +161,10 @@ extern void deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
Index rtindex, Relation rel,
List *targetAttrs, bool doNothing,
List *withCheckOptionList, List *returningList,
- List **retrieved_attrs);
+ List **retrieved_attrs, int *values_end_len);
+extern void rebuildInsertSql(StringInfo buf, char *orig_query,
+ int values_end_len, int num_cols,
+ int num_rows);
extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte,
Index rtindex, Relation rel,
List *targetAttrs,
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 25dbc08b98..fd5abf2471 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -2711,3 +2711,94 @@ SELECT 1 FROM ft1 LIMIT 1;
ALTER SERVER loopback OPTIONS (ADD use_remote_estimate 'off');
-- The invalid connection gets closed in pgfdw_xact_callback during commit.
COMMIT;
+
+-- ===================================================================
+-- batch insert
+-- ===================================================================
+
+BEGIN;
+
+CREATE SERVER batch10 FOREIGN DATA WRAPPER postgres_fdw OPTIONS( batch_size '10' );
+
+SELECT count(*)
+FROM pg_foreign_server
+WHERE srvname = 'batch10'
+AND srvoptions @> array['batch_size=10'];
+
+ALTER SERVER batch10 OPTIONS( SET batch_size '20' );
+
+SELECT count(*)
+FROM pg_foreign_server
+WHERE srvname = 'batch10'
+AND srvoptions @> array['batch_size=10'];
+
+SELECT count(*)
+FROM pg_foreign_server
+WHERE srvname = 'batch10'
+AND srvoptions @> array['batch_size=20'];
+
+CREATE FOREIGN TABLE table30 ( x int ) SERVER batch10 OPTIONS ( batch_size '30' );
+
+SELECT COUNT(*)
+FROM pg_foreign_table
+WHERE ftrelid = 'table30'::regclass
+AND ftoptions @> array['batch_size=30'];
+
+ALTER FOREIGN TABLE table30 OPTIONS ( SET batch_size '40');
+
+SELECT COUNT(*)
+FROM pg_foreign_table
+WHERE ftrelid = 'table30'::regclass
+AND ftoptions @> array['batch_size=30'];
+
+SELECT COUNT(*)
+FROM pg_foreign_table
+WHERE ftrelid = 'table30'::regclass
+AND ftoptions @> array['batch_size=40'];
+
+ROLLBACK;
+
+CREATE TABLE batch_table ( x int );
+
+CREATE FOREIGN TABLE ftable ( x int ) SERVER loopback OPTIONS ( table_name 'batch_table', batch_size '10' );
+INSERT INTO ftable SELECT * FROM generate_series(1, 10) i;
+INSERT INTO ftable SELECT * FROM generate_series(11, 31) i;
+INSERT INTO ftable VALUES (32);
+INSERT INTO ftable VALUES (33), (34);
+SELECT COUNT(*) FROM ftable;
+TRUNCATE batch_table;
+DROP FOREIGN TABLE ftable;
+
+-- Disable batch insert
+CREATE FOREIGN TABLE ftable ( x int ) SERVER loopback OPTIONS ( table_name 'batch_table', batch_size '1' );
+INSERT INTO ftable VALUES (1), (2);
+SELECT COUNT(*) FROM ftable;
+DROP FOREIGN TABLE ftable;
+DROP TABLE batch_table;
+
+-- Use partitioning
+CREATE TABLE batch_table ( x int ) PARTITION BY HASH (x);
+
+CREATE TABLE batch_table_p0 (LIKE batch_table);
+CREATE FOREIGN TABLE batch_table_p0f
+ PARTITION OF batch_table
+ FOR VALUES WITH (MODULUS 3, REMAINDER 0)
+ SERVER loopback
+ OPTIONS (table_name 'batch_table_p0', batch_size '10');
+
+CREATE TABLE batch_table_p1 (LIKE batch_table);
+CREATE FOREIGN TABLE batch_table_p1f
+ PARTITION OF batch_table
+ FOR VALUES WITH (MODULUS 3, REMAINDER 1)
+ SERVER loopback
+ OPTIONS (table_name 'batch_table_p1', batch_size '1');
+
+CREATE TABLE batch_table_p2
+ PARTITION OF batch_table
+ FOR VALUES WITH (MODULUS 3, REMAINDER 2);
+
+INSERT INTO batch_table SELECT * FROM generate_series(1, 66) i;
+SELECT COUNT(*) FROM batch_table;
+
+-- Clean up
+DROP TABLE batch_table CASCADE;
diff --git a/doc/src/sgml/fdwhandler.sgml b/doc/src/sgml/fdwhandler.sgml
index 9c9293414c..02a34b40b3 100644
--- a/doc/src/sgml/fdwhandler.sgml
+++ b/doc/src/sgml/fdwhandler.sgml
@@ -523,8 +523,9 @@ BeginForeignModify(ModifyTableState *mtstate,
Begin executing a foreign table modification operation. This routine is
called during executor startup. It should perform any initialization
needed prior to the actual table modifications. Subsequently,
- <function>ExecForeignInsert</function>, <function>ExecForeignUpdate</function> or
- <function>ExecForeignDelete</function> will be called for each tuple to be
+ <function>ExecForeignInsert/ExecForeignBatchInsert</function>,
+ <function>ExecForeignUpdate</function> or
+ <function>ExecForeignDelete</function> will be called for tuple(s) to be
inserted, updated, or deleted.
</para>
@@ -614,6 +615,81 @@ ExecForeignInsert(EState *estate,
<para>
<programlisting>
+TupleTableSlot **
+ExecForeignBatchInsert(EState *estate,
+ ResultRelInfo *rinfo,
+ TupleTableSlot **slots,
+ TupleTableSlot *planSlots,
+ int *numSlots);
+</programlisting>
+
+ Insert multiple tuples in bulk into the foreign table.
+ The parameters are the same for <function>ExecForeignInsert</function>
+ except <literal>slots</literal> and <literal>planSlots</literal> contain
+ multiple tuples and <literal>*numSlots></literal> specifies the number of
+ tuples in those arrays.
+ </para>
+
+ <para>
+ The return value is an array of slots containing the data that was
+ actually inserted (this might differ from the data supplied, for
+ example as a result of trigger actions.)
+ The passed-in <literal>slots</literal> can be re-used for this purpose.
+ The number of successfully inserted tuples is returned in
+ <literal>*numSlots</literal>.
+ </para>
+
+ <para>
+ The data in the returned slot is used only if the <command>INSERT</command>
+ statement involves a view
+ <literal>WITH CHECK OPTION</literal>; or if the foreign table has
+ an <literal>AFTER ROW</literal> trigger. Triggers require all columns,
+ but the FDW could choose to optimize away returning some or all columns
+ depending on the contents of the
+ <literal>WITH CHECK OPTION</literal> constraints.
+ </para>
+
+ <para>
+ If the <function>ExecForeignBatchInsert</function> or
+ <function>GetModifyBatchSize</function> pointer is set to
+ <literal>NULL</literal>, attempts to insert into the foreign table will
+ use <function>ExecForeignInsert</function>.
+ This function is not used if the <command>INSERT</command> has the
+ <literal>RETURNING></literal> clause.
+ </para>
+
+ <para>
+ Note that this function is also called when inserting routed tuples into
+ a foreign-table partition. See the callback functions
+ described below that allow the FDW to support that.
+ </para>
+
+ <para>
+<programlisting>
+int
+GetModifyBatchSize(ResultRelInfo *rinfo);
+</programlisting>
+
+ Report the maximum number of tuples that a single
+ <function>ExecForeignBatchInsert</function> call can handle for
+ the specified foreign table. That is, The executor passes at most
+ the number of tuples that this function returns to
+ <function>ExecForeignBatchInsert</function>.
+ <literal>rinfo</literal> is the <structname>ResultRelInfo</structname> struct describing
+ the target foreign table.
+ The FDW is expected to provide a foreign server and/or foreign
+ table option for the user to set this value, or some hard-coded value.
+ </para>
+
+ <para>
+ If the <function>ExecForeignBatchInsert</function> or
+ <function>GetModifyBatchSize</function> pointer is set to
+ <literal>NULL</literal>, attempts to insert into the foreign table will
+ use <function>ExecForeignInsert</function>.
+ </para>
+
+ <para>
+<programlisting>
TupleTableSlot *
ExecForeignUpdate(EState *estate,
ResultRelInfo *rinfo,
@@ -741,8 +817,9 @@ BeginForeignInsert(ModifyTableState *mtstate,
in both cases when it is the partition chosen for tuple routing and the
target specified in a <command>COPY FROM</command> command. It should
perform any initialization needed prior to the actual insertion.
- Subsequently, <function>ExecForeignInsert</function> will be called for
- each tuple to be inserted into the foreign table.
+ Subsequently, <function>ExecForeignInsert</function> or
+ <function>ExecForeignBatchInsert</function> will be called for
+ tuple(s) to be inserted into the foreign table.
</para>
<para>
@@ -773,8 +850,8 @@ BeginForeignInsert(ModifyTableState *mtstate,
<para>
Note that if the FDW does not support routable foreign-table partitions
and/or executing <command>COPY FROM</command> on foreign tables, this
- function or <function>ExecForeignInsert</function> subsequently called
- must throw error as needed.
+ function or <function>ExecForeignInsert/ExecForeignBatchInsert</function>
+ subsequently called must throw error as needed.
</para>
<para>
diff --git a/doc/src/sgml/postgres-fdw.sgml b/doc/src/sgml/postgres-fdw.sgml
index e6fd2143c1..97eeb64a02 100644
--- a/doc/src/sgml/postgres-fdw.sgml
+++ b/doc/src/sgml/postgres-fdw.sgml
@@ -354,6 +354,19 @@ OPTIONS (ADD password_required 'false');
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><literal>batch_size</literal></term>
+ <listitem>
+ <para>
+ This option specifies the number of rows <filename>postgres_fdw</filename>
+ should insert in each insert operation. It can be specified for a
+ foreign table or a foreign server. The option specified on a table
+ overrides an option specified for the server.
+ The default is <literal>100</literal>.
+ </para>
+ </listitem>
+ </varlistentry>
+
</variablelist>
</sect3>
diff --git a/src/backend/executor/execPartition.c b/src/backend/executor/execPartition.c
index 941731a0a9..b0a354ad6f 100644
--- a/src/backend/executor/execPartition.c
+++ b/src/backend/executor/execPartition.c
@@ -2192,3 +2192,14 @@ find_matching_subplans_recurse(PartitionPruningData *prunedata,
}
}
}
+
+/*
+ * ExecGetTouchedPartitions -- Get the partitions touched by
+ * this routing
+ */
+ResultRelInfo **
+ExecGetTouchedPartitions(PartitionTupleRouting *proute, int *count)
+{
+ *count = proute->num_partitions;
+ return proute->partitions;
+}
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index d7b8f65591..7ab99ceb53 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -58,6 +58,13 @@
#include "utils/rel.h"
+static void ExecBatchInsert(ModifyTableState *mtstate,
+ ResultRelInfo *resultRelInfo,
+ TupleTableSlot **slots,
+ TupleTableSlot **planSlots,
+ int numSlots,
+ EState *estate,
+ bool canSetTag);
static bool ExecOnConflictUpdate(ModifyTableState *mtstate,
ResultRelInfo *resultRelInfo,
ItemPointer conflictTid,
@@ -389,6 +396,7 @@ ExecInsert(ModifyTableState *mtstate,
ModifyTable *node = (ModifyTable *) mtstate->ps.plan;
OnConflictAction onconflict = node->onConflictAction;
PartitionTupleRouting *proute = mtstate->mt_partition_tuple_routing;
+ MemoryContext oldContext;
/*
* If the input result relation is a partitioned table, find the leaf
@@ -441,6 +449,71 @@ ExecInsert(ModifyTableState *mtstate,
ExecComputeStoredGenerated(resultRelInfo, estate, slot,
CMD_INSERT);
+ /*
+ * Determine if the FDW supports batch insert and determine the batch
+ * size (a FDW may support batching, but it mayb e disabled for the
+ * server/table). Do this only once, at the beginning - we don't want
+ * the batch size to change during execution.
+ */
+ if (resultRelInfo->ri_FdwRoutine->GetModifyBatchSize &&
+ resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert &&
+ resultRelInfo->ri_projectReturning == NULL &&
+ (!resultRelInfo->ri_TrigDesc->trig_insert_after_row) &&
+ resultRelInfo->ri_BatchSize == 0)
+ resultRelInfo->ri_BatchSize =
+ resultRelInfo->ri_FdwRoutine->GetModifyBatchSize(resultRelInfo);
+
+ Assert(resultRelInfo->ri_BatchSize >= 0);
+
+ /*
+ * If the FDW supports batching, and batching is requested, accumulate
+ * rows and insert them in batches. Otherwise use the per-row inserts.
+ */
+ if (resultRelInfo->ri_BatchSize > 1)
+ {
+ /*
+ * If a certain number of tuples have already been accumulated,
+ * or a tuple has come for a different relation than that for
+ * the accumulated tuples, perform the batch insert
+ */
+ if (resultRelInfo->ri_NumSlots == resultRelInfo->ri_BatchSize)
+ {
+ ExecBatchInsert(mtstate, resultRelInfo,
+ resultRelInfo->ri_Slots,
+ resultRelInfo->ri_PlanSlots,
+ resultRelInfo->ri_NumSlots,
+ estate, canSetTag);
+ resultRelInfo->ri_NumSlots = 0;
+ }
+
+ oldContext = MemoryContextSwitchTo(estate->es_query_cxt);
+
+ if (resultRelInfo->ri_Slots == NULL)
+ {
+ resultRelInfo->ri_Slots = palloc(sizeof(TupleTableSlot *) *
+ resultRelInfo->ri_BatchSize);
+ resultRelInfo->ri_PlanSlots = palloc(sizeof(TupleTableSlot *) *
+ resultRelInfo->ri_BatchSize);
+ }
+
+ resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots] =
+ MakeSingleTupleTableSlot(slot->tts_tupleDescriptor,
+ slot->tts_ops);
+ ExecCopySlot(resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots],
+ slot);
+ resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots] =
+ MakeSingleTupleTableSlot(planSlot->tts_tupleDescriptor,
+ planSlot->tts_ops);
+ ExecCopySlot(resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots],
+ planSlot);
+
+ resultRelInfo->ri_NumSlots++;
+
+ MemoryContextSwitchTo(oldContext);
+
+ return NULL;
+ }
+
/*
* insert into foreign table: let the FDW do it
*/
@@ -698,6 +771,70 @@ ExecInsert(ModifyTableState *mtstate,
return result;
}
+/* ----------------------------------------------------------------
+ * ExecBatchInsert
+ *
+ * Insert multiple tuples in an efficient way.
+ * Currently, this handles inserting into a foreign table without
+ * RETURNING clause.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecBatchInsert(ModifyTableState *mtstate,
+ ResultRelInfo *resultRelInfo,
+ TupleTableSlot **slots,
+ TupleTableSlot **planSlots,
+ int numSlots,
+ EState *estate,
+ bool canSetTag)
+{
+ int i;
+ int numInserted = numSlots;
+ TupleTableSlot *slot = NULL;
+ TupleTableSlot **rslots;
+
+ /*
+ * insert into foreign table: let the FDW do it
+ */
+ rslots = resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert(estate,
+ resultRelInfo,
+ slots,
+ planSlots,
+ &numInserted);
+
+ for (i = 0; i < numInserted; i++)
+ {
+ slot = rslots[i];
+
+ /*
+ * AFTER ROW Triggers or RETURNING expressions might reference the
+ * tableoid column, so (re-)initialize tts_tableOid before evaluating
+ * them.
+ */
+ slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+
+ /* AFTER ROW INSERT Triggers */
+ ExecARInsertTriggers(estate, resultRelInfo, slot, NIL,
+ mtstate->mt_transition_capture);
+
+ /*
+ * Check any WITH CHECK OPTION constraints from parent views. See the
+ * comment in ExecInsert.
+ */
+ if (resultRelInfo->ri_WithCheckOptions != NIL)
+ ExecWithCheckOptions(WCO_VIEW_CHECK, resultRelInfo, slot, estate);
+ }
+
+ if (canSetTag && numInserted > 0)
+ estate->es_processed += numInserted;
+
+ for (i = 0; i < numSlots; i++)
+ {
+ ExecDropSingleTupleTableSlot(slots[i]);
+ ExecDropSingleTupleTableSlot(planSlots[i]);
+ }
+}
+
/* ----------------------------------------------------------------
* ExecDelete
*
@@ -1937,6 +2074,9 @@ ExecModifyTable(PlanState *pstate)
ItemPointerData tuple_ctid;
HeapTupleData oldtupdata;
HeapTuple oldtuple;
+ PartitionTupleRouting *proute = node->mt_partition_tuple_routing;
+ ResultRelInfo **resultRelInfos;
+ int num_partitions;
CHECK_FOR_INTERRUPTS();
@@ -2152,6 +2292,28 @@ ExecModifyTable(PlanState *pstate)
return slot;
}
+ /*
+ * Insert remaining tuples for batch insert.
+ */
+ if (proute)
+ resultRelInfos = ExecGetTouchedPartitions(proute, &num_partitions);
+ else
+ {
+ resultRelInfos = &resultRelInfo;
+ num_partitions = 1;
+ }
+
+ for (int i = 0; i < num_partitions; i++)
+ {
+ resultRelInfo = resultRelInfos[i];
+ if (resultRelInfo->ri_NumSlots > 0)
+ ExecBatchInsert(node, resultRelInfo,
+ resultRelInfo->ri_Slots,
+ resultRelInfo->ri_PlanSlots,
+ resultRelInfo->ri_NumSlots,
+ estate, node->canSetTag);
+ }
+
/*
* We're done, but fire AFTER STATEMENT triggers before exiting.
*/
diff --git a/src/backend/nodes/list.c b/src/backend/nodes/list.c
index c4eba6b053..dbf6b30233 100644
--- a/src/backend/nodes/list.c
+++ b/src/backend/nodes/list.c
@@ -277,6 +277,21 @@ list_make4_impl(NodeTag t, ListCell datum1, ListCell datum2,
return list;
}
+List *
+list_make5_impl(NodeTag t, ListCell datum1, ListCell datum2,
+ ListCell datum3, ListCell datum4, ListCell datum5)
+{
+ List *list = new_list(t, 5);
+
+ list->elements[0] = datum1;
+ list->elements[1] = datum2;
+ list->elements[2] = datum3;
+ list->elements[3] = datum4;
+ list->elements[4] = datum5;
+ check_list_invariants(list);
+ return list;
+}
+
/*
* Make room for a new head cell in the given (non-NIL) list.
*
diff --git a/src/include/executor/execPartition.h b/src/include/executor/execPartition.h
index d30ffde7d9..2bb5a85fb1 100644
--- a/src/include/executor/execPartition.h
+++ b/src/include/executor/execPartition.h
@@ -125,5 +125,6 @@ extern PartitionPruneState *ExecCreatePartitionPruneState(PlanState *planstate,
extern Bitmapset *ExecFindMatchingSubPlans(PartitionPruneState *prunestate);
extern Bitmapset *ExecFindInitialMatchingSubPlans(PartitionPruneState *prunestate,
int nsubplans);
+extern ResultRelInfo **ExecGetTouchedPartitions(PartitionTupleRouting *proute, int *count);
#endif /* EXECPARTITION_H */
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index 2953499fb1..7946ca82f6 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -85,6 +85,14 @@ typedef TupleTableSlot *(*ExecForeignInsert_function) (EState *estate,
TupleTableSlot *slot,
TupleTableSlot *planSlot);
+typedef TupleTableSlot **(*ExecForeignBatchInsert_function) (EState *estate,
+ ResultRelInfo *rinfo,
+ TupleTableSlot **slots,
+ TupleTableSlot **planSlots,
+ int *numSlots);
+
+typedef int (*GetModifyBatchSize_function) (ResultRelInfo *rinfo);
+
typedef TupleTableSlot *(*ExecForeignUpdate_function) (EState *estate,
ResultRelInfo *rinfo,
TupleTableSlot *slot,
@@ -209,6 +217,8 @@ typedef struct FdwRoutine
PlanForeignModify_function PlanForeignModify;
BeginForeignModify_function BeginForeignModify;
ExecForeignInsert_function ExecForeignInsert;
+ ExecForeignBatchInsert_function ExecForeignBatchInsert;
+ GetModifyBatchSize_function GetModifyBatchSize;
ExecForeignUpdate_function ExecForeignUpdate;
ExecForeignDelete_function ExecForeignDelete;
EndForeignModify_function EndForeignModify;
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 48c3f570fa..d65099c94a 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -446,6 +446,12 @@ typedef struct ResultRelInfo
/* true when modifying foreign table directly */
bool ri_usesFdwDirectModify;
+ /* batch insert stuff */
+ int ri_NumSlots; /* number of slots in the array */
+ int ri_BatchSize; /* max slots inserted in a single batch */
+ TupleTableSlot **ri_Slots; /* input tuples for batch insert */
+ TupleTableSlot **ri_PlanSlots;
+
/* list of WithCheckOption's to be checked */
List *ri_WithCheckOptions;
diff --git a/src/include/nodes/pg_list.h b/src/include/nodes/pg_list.h
index 710dcd37ef..404e03f132 100644
--- a/src/include/nodes/pg_list.h
+++ b/src/include/nodes/pg_list.h
@@ -213,6 +213,10 @@ list_length(const List *l)
#define list_make4(x1,x2,x3,x4) \
list_make4_impl(T_List, list_make_ptr_cell(x1), list_make_ptr_cell(x2), \
list_make_ptr_cell(x3), list_make_ptr_cell(x4))
+#define list_make5(x1,x2,x3,x4,x5) \
+ list_make5_impl(T_List, list_make_ptr_cell(x1), list_make_ptr_cell(x2), \
+ list_make_ptr_cell(x3), list_make_ptr_cell(x4), \
+ list_make_ptr_cell(x5))
#define list_make1_int(x1) \
list_make1_impl(T_IntList, list_make_int_cell(x1))
@@ -224,6 +228,10 @@ list_length(const List *l)
#define list_make4_int(x1,x2,x3,x4) \
list_make4_impl(T_IntList, list_make_int_cell(x1), list_make_int_cell(x2), \
list_make_int_cell(x3), list_make_int_cell(x4))
+#define list_make5_int(x1,x2,x3,x4,x5) \
+ list_make5_impl(T_IntList, list_make_int_cell(x1), list_make_int_cell(x2), \
+ list_make_int_cell(x3), list_make_int_cell(x4), \
+ list_make_int_cell(x5))
#define list_make1_oid(x1) \
list_make1_impl(T_OidList, list_make_oid_cell(x1))
@@ -235,6 +243,10 @@ list_length(const List *l)
#define list_make4_oid(x1,x2,x3,x4) \
list_make4_impl(T_OidList, list_make_oid_cell(x1), list_make_oid_cell(x2), \
list_make_oid_cell(x3), list_make_oid_cell(x4))
+#define list_make5_oid(x1,x2,x3,x4,x5) \
+ list_make5_impl(T_OidList, list_make_oid_cell(x1), list_make_oid_cell(x2), \
+ list_make_oid_cell(x3), list_make_oid_cell(x4), \
+ list_make_oid_cell(x5))
/*
* Locate the n'th cell (counting from 0) of the list.
@@ -520,6 +532,9 @@ extern List *list_make3_impl(NodeTag t, ListCell datum1, ListCell datum2,
ListCell datum3);
extern List *list_make4_impl(NodeTag t, ListCell datum1, ListCell datum2,
ListCell datum3, ListCell datum4);
+extern List *list_make5_impl(NodeTag t, ListCell datum1, ListCell datum2,
+ ListCell datum3, ListCell datum4,
+ ListCell datum5);
extern pg_nodiscard List *lappend(List *list, void *datum);
extern pg_nodiscard List *lappend_int(List *list, int datum);