On 1/14/21 2:57 PM, Amit Langote wrote:
> On Thu, Jan 14, 2021 at 21:57 Tomas Vondra
> <tomas.von...@enterprisedb.com <mailto:tomas.von...@enterprisedb.com>>
> wrote:
> 
>     On 1/14/21 9:58 AM, Amit Langote wrote:
>     > Hi,
>     >
>     > On Thu, Jan 14, 2021 at 2:41 AM Tomas Vondra
>     > <tomas.von...@enterprisedb.com
>     <mailto:tomas.von...@enterprisedb.com>> wrote:
>     >> On 1/13/21 3:43 PM, Tomas Vondra wrote:
>     >>> Thanks for the report. Yeah, I think there's a missing check in
>     >>> ExecInsert. Adding
>     >>>
>     >>>   (!resultRelInfo->ri_TrigDesc->trig_insert_after_row)
>     >>>
>     >>> solves this. But now I'm wondering if this is the wrong place to
>     make
>     >>> this decision. I mean, why should we make the decision here,
>     when the
>     >>> decision whether to have a RETURNING clause is made in
>     postgres_fdw in
>     >>> deparseReturningList? We don't really know what the other FDWs
>     will do,
>     >>> for example.
>     >>>
>     >>> So I think we should just move all of this into
>     GetModifyBatchSize. We
>     >>> can start with ri_BatchSize = 0. And then do
>     >>>
>     >>>   if (resultRelInfo->ri_BatchSize == 0)
>     >>>     resultRelInfo->ri_BatchSize =
>     >>>     
>      resultRelInfo->ri_FdwRoutine->GetModifyBatchSize(resultRelInfo);
>     >>>
>     >>>   if (resultRelInfo->ri_BatchSize > 1)
>     >>>   {
>     >>>     ... do batching ...
>     >>>   }
>     >>>
>     >>> The GetModifyBatchSize would always return value > 0, so either
>     1 (no
>     >>> batching) or >1 (batching).
>     >>>
>     >>
>     >> FWIW the attached v8 patch does this - most of the conditions are
>     moved
>     >> to the GetModifyBatchSize() callback.
>     >
>     > Thanks.  A few comments:
>     >
>     > * I agree with leaving it up to an FDW to look at the properties of
>     > the table and of the operation being performed to decide whether or
>     > not to use batching, although maybe BeginForeignModify() is a better
>     > place for putting that logic instead of GetModifyBatchSize()?  So, in
>     > create_foreign_modify(), instead of PgFdwModifyState.batch_size simply
>     > being set to match the table's or the server's value for the
>     > batch_size option, make it also consider the things that prevent
>     > batching and set the execution state's batch_size based on that.
>     > GetModifyBatchSize() simply returns that value.
>     >
>     > * Regarding the timing of calling GetModifyBatchSize() to set
>     > ri_BatchSize, I wonder if it wouldn't be better to call it just once,
>     > say from ExecInitModifyTable(), right after BeginForeignModify()
>     > returns?  I don't quite understand why it is being called from
>     > ExecInsert().  Can the batch size change once the execution starts?
>     >
> 
>     But it should be called just once. The idea is that initially we have
>     batch_size=0, and the fist call returns value that is >= 1. So we never
>     call it again. But maybe it could be called from BeginForeignModify, in
>     which case we'd not need this logic with first setting it to 0 etc.
> 
> 
> Right, although I was thinking that maybe ri_BatchSize itself is not to
> be written to by the FDW.  Not to say that’s doing anything wrong though.
> 
>     > * Lastly, how about calling it GetForeignModifyBatchSize() to be
>     > consistent with other nearby callbacks?
>     >
> 
>     Yeah, good point.
> 
>     >> I've removed the check for the
>     >> BatchInsert callback, though - the FDW knows whether it supports
>     that,
>     >> and it seems a bit pointless at the moment as there are no other
>     batch
>     >> callbacks. Maybe we should add an Assert somewhere, though?
>     >
>     > Hmm, not checking whether BatchInsert() exists may not be good idea,
>     > because if an FDW's GetModifyBatchSize() returns a value > 1 but
>     > there's no BatchInsert() function to call, ExecBatchInsert() would
>     > trip.  I don't see the newly added documentation telling FDW authors
>     > to either define both or none.
>     >
> 
>     Hmm. The BatchInsert check seemed somewhat unnecessary to me, but OTOH
>     it can't hurt, I guess. I'll ad it back.
> 
>     > Regarding how this plays with partitions, I don't think we need
>     > ExecGetTouchedPartitions(), because you can get the routed-to
>     > partitions using es_tuple_routing_result_relations.  Also, perhaps
> 
>     I'm not very familiar with es_tuple_routing_result_relations, but that
>     doesn't seem to work. I've replaced the flushing code at the end of
>     ExecModifyTable with a loop over es_tuple_routing_result_relations, but
>     then some of the rows are missing (i.e. not flushed).
> 
> 
> I should’ve mentioned es_opened_result_relations too which contain
> non-routing result relations.  So I really meant if (proute) then use
> es_tuple_routing_result_relations, else es_opened_result_relations. 
> This should work as long as batching is only used for inserts.
> 

Ah, right. That did the trick.

Attached is v9 with all of those tweaks, except for moving the BatchSize
call to BeginForeignModify - I tried that, but it did not seem like an
improvement, because we'd still need the checks for API callbacks in
ExecInsert for example. So I decided not to do that.


regards

-- 
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
>From 825640430a5882edba7d9a0e21960e29922815ec Mon Sep 17 00:00:00 2001
From: Tomas Vondra <to...@2ndquadrant.com>
Date: Thu, 14 Jan 2021 15:59:49 +0100
Subject: [PATCH] v9

---
 contrib/postgres_fdw/deparse.c                |  43 ++-
 .../postgres_fdw/expected/postgres_fdw.out    | 116 ++++++-
 contrib/postgres_fdw/option.c                 |  14 +
 contrib/postgres_fdw/postgres_fdw.c           | 298 ++++++++++++++----
 contrib/postgres_fdw/postgres_fdw.h           |   5 +-
 contrib/postgres_fdw/sql/postgres_fdw.sql     |  91 ++++++
 doc/src/sgml/fdwhandler.sgml                  |  89 +++++-
 doc/src/sgml/postgres-fdw.sgml                |  13 +
 src/backend/executor/nodeModifyTable.c        | 160 ++++++++++
 src/backend/nodes/list.c                      |  15 +
 src/include/foreign/fdwapi.h                  |  10 +
 src/include/nodes/execnodes.h                 |   6 +
 src/include/nodes/pg_list.h                   |  15 +
 13 files changed, 809 insertions(+), 66 deletions(-)

diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c
index 3cf7b4eb1e..2d38ab25cb 100644
--- a/contrib/postgres_fdw/deparse.c
+++ b/contrib/postgres_fdw/deparse.c
@@ -1711,7 +1711,7 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
 				 Index rtindex, Relation rel,
 				 List *targetAttrs, bool doNothing,
 				 List *withCheckOptionList, List *returningList,
-				 List **retrieved_attrs)
+				 List **retrieved_attrs, int *values_end_len)
 {
 	AttrNumber	pindex;
 	bool		first;
@@ -1754,6 +1754,7 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
 	}
 	else
 		appendStringInfoString(buf, " DEFAULT VALUES");
+	*values_end_len = buf->len;
 
 	if (doNothing)
 		appendStringInfoString(buf, " ON CONFLICT DO NOTHING");
@@ -1763,6 +1764,46 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
 						 withCheckOptionList, returningList, retrieved_attrs);
 }
 
+/*
+ * rebuild remote INSERT statement
+ *
+ */
+void
+rebuildInsertSql(StringInfo buf, char *orig_query,
+				 int values_end_len, int num_cols,
+				 int num_rows)
+{
+	int			i, j;
+	int			pindex;
+	bool		first;
+
+	/* Copy up to the end of the first record from the original query */
+	appendBinaryStringInfo(buf, orig_query, values_end_len);
+
+	/* Add records to VALUES clause */
+	pindex = num_cols + 1;
+	for (i = 0; i < num_rows; i++)
+	{
+		appendStringInfoString(buf, ", (");
+
+		first = true;
+		for (j = 0; j < num_cols; j++)
+		{
+			if (!first)
+				appendStringInfoString(buf, ", ");
+			first = false;
+
+			appendStringInfo(buf, "$%d", pindex);
+			pindex++;
+		}
+
+		appendStringInfoChar(buf, ')');
+	}
+
+	/* Copy stuff after VALUES clause from the original query */
+	appendStringInfoString(buf, orig_query + values_end_len);
+}
+
 /*
  * deparse remote UPDATE statement
  *
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index c11092f8cc..96bad17ded 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -8911,7 +8911,7 @@ DO $d$
     END;
 $d$;
 ERROR:  invalid option "password"
-HINT:  Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, fetch_size
+HINT:  Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, fetch_size, batch_size
 CONTEXT:  SQL statement "ALTER SERVER loopback_nopw OPTIONS (ADD password 'dummypw')"
 PL/pgSQL function inline_code_block line 3 at EXECUTE
 -- If we add a password for our user mapping instead, we should get a different
@@ -9053,3 +9053,117 @@ SELECT 1 FROM ft1 LIMIT 1;
 ALTER SERVER loopback OPTIONS (ADD use_remote_estimate 'off');
 -- The invalid connection gets closed in pgfdw_xact_callback during commit.
 COMMIT;
+-- ===================================================================
+-- batch insert
+-- ===================================================================
+BEGIN;
+CREATE SERVER batch10 FOREIGN DATA WRAPPER postgres_fdw OPTIONS( batch_size '10' );
+SELECT count(*)
+FROM pg_foreign_server
+WHERE srvname = 'batch10'
+AND srvoptions @> array['batch_size=10'];
+ count 
+-------
+     1
+(1 row)
+
+ALTER SERVER batch10 OPTIONS( SET batch_size '20' );
+SELECT count(*)
+FROM pg_foreign_server
+WHERE srvname = 'batch10'
+AND srvoptions @> array['batch_size=10'];
+ count 
+-------
+     0
+(1 row)
+
+SELECT count(*)
+FROM pg_foreign_server
+WHERE srvname = 'batch10'
+AND srvoptions @> array['batch_size=20'];
+ count 
+-------
+     1
+(1 row)
+
+CREATE FOREIGN TABLE table30 ( x int ) SERVER batch10 OPTIONS ( batch_size '30' );
+SELECT COUNT(*)
+FROM pg_foreign_table
+WHERE ftrelid = 'table30'::regclass
+AND ftoptions @> array['batch_size=30'];
+ count 
+-------
+     1
+(1 row)
+
+ALTER FOREIGN TABLE table30 OPTIONS ( SET batch_size '40');
+SELECT COUNT(*)
+FROM pg_foreign_table
+WHERE ftrelid = 'table30'::regclass
+AND ftoptions @> array['batch_size=30'];
+ count 
+-------
+     0
+(1 row)
+
+SELECT COUNT(*)
+FROM pg_foreign_table
+WHERE ftrelid = 'table30'::regclass
+AND ftoptions @> array['batch_size=40'];
+ count 
+-------
+     1
+(1 row)
+
+ROLLBACK;
+CREATE TABLE batch_table ( x int );
+CREATE FOREIGN TABLE ftable ( x int ) SERVER loopback OPTIONS ( table_name 'batch_table', batch_size '10' );
+INSERT INTO ftable SELECT * FROM generate_series(1, 10) i;
+INSERT INTO ftable SELECT * FROM generate_series(11, 31) i;
+INSERT INTO ftable VALUES (32);
+INSERT INTO ftable VALUES (33), (34);
+SELECT COUNT(*) FROM ftable;
+ count 
+-------
+    34
+(1 row)
+
+TRUNCATE batch_table;
+DROP FOREIGN TABLE ftable;
+-- Disable batch insert
+CREATE FOREIGN TABLE ftable ( x int ) SERVER loopback OPTIONS ( table_name 'batch_table', batch_size '1' );
+INSERT INTO ftable VALUES (1), (2);
+SELECT COUNT(*) FROM ftable;
+ count 
+-------
+     2
+(1 row)
+
+DROP FOREIGN TABLE ftable;
+DROP TABLE batch_table;
+-- Use partitioning
+CREATE TABLE batch_table ( x int ) PARTITION BY HASH (x);
+CREATE TABLE batch_table_p0 (LIKE batch_table);
+CREATE FOREIGN TABLE batch_table_p0f
+	PARTITION OF batch_table
+	FOR VALUES WITH (MODULUS 3, REMAINDER 0)
+	SERVER loopback
+	OPTIONS (table_name 'batch_table_p0', batch_size '10');
+CREATE TABLE batch_table_p1 (LIKE batch_table);
+CREATE FOREIGN TABLE batch_table_p1f
+	PARTITION OF batch_table
+	FOR VALUES WITH (MODULUS 3, REMAINDER 1)
+	SERVER loopback
+	OPTIONS (table_name 'batch_table_p1', batch_size '1');
+CREATE TABLE batch_table_p2
+	PARTITION OF batch_table
+	FOR VALUES WITH (MODULUS 3, REMAINDER 2);
+INSERT INTO batch_table SELECT * FROM generate_series(1, 66) i;
+SELECT COUNT(*) FROM batch_table;
+ count 
+-------
+    66
+(1 row)
+
+-- Clean up
+DROP TABLE batch_table CASCADE;
diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c
index 1fec3c3eea..64698c4da3 100644
--- a/contrib/postgres_fdw/option.c
+++ b/contrib/postgres_fdw/option.c
@@ -142,6 +142,17 @@ postgres_fdw_validator(PG_FUNCTION_ARGS)
 						 errmsg("%s requires a non-negative integer value",
 								def->defname)));
 		}
+		else if (strcmp(def->defname, "batch_size") == 0)
+		{
+			int			batch_size;
+
+			batch_size = strtol(defGetString(def), NULL, 10);
+			if (batch_size <= 0)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("%s requires a non-negative integer value",
+								def->defname)));
+		}
 		else if (strcmp(def->defname, "password_required") == 0)
 		{
 			bool		pw_required = defGetBoolean(def);
@@ -203,6 +214,9 @@ InitPgFdwOptions(void)
 		/* fetch_size is available on both server and table */
 		{"fetch_size", ForeignServerRelationId, false},
 		{"fetch_size", ForeignTableRelationId, false},
+		/* batch_size is available on both server and table */
+		{"batch_size", ForeignServerRelationId, false},
+		{"batch_size", ForeignTableRelationId, false},
 		{"password_required", UserMappingRelationId, false},
 
 		/*
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 2f2d4d171c..85a072bc88 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -87,8 +87,10 @@ enum FdwScanPrivateIndex
  * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server
  * 2) Integer list of target attribute numbers for INSERT/UPDATE
  *	  (NIL for a DELETE)
- * 3) Boolean flag showing if the remote query has a RETURNING clause
- * 4) Integer list of attribute numbers retrieved by RETURNING, if any
+ * 3) Length till the end of VALUES clause for INSERT
+ *	  (-1 for a DELETE/UPDATE)
+ * 4) Boolean flag showing if the remote query has a RETURNING clause
+ * 5) Integer list of attribute numbers retrieved by RETURNING, if any
  */
 enum FdwModifyPrivateIndex
 {
@@ -96,6 +98,8 @@ enum FdwModifyPrivateIndex
 	FdwModifyPrivateUpdateSql,
 	/* Integer list of target attribute numbers for INSERT/UPDATE */
 	FdwModifyPrivateTargetAttnums,
+	/* Length till the end of VALUES clause (as an integer Value node) */
+	FdwModifyPrivateLen,
 	/* has-returning flag (as an integer Value node) */
 	FdwModifyPrivateHasReturning,
 	/* Integer list of attribute numbers retrieved by RETURNING */
@@ -176,7 +180,10 @@ typedef struct PgFdwModifyState
 
 	/* extracted fdw_private data */
 	char	   *query;			/* text of INSERT/UPDATE/DELETE command */
+	char	   *orig_query;		/* original text of INSERT command */
 	List	   *target_attrs;	/* list of target attribute numbers */
+	int			values_end;		/* length up to the end of VALUES */
+	int			batch_size;		/* value of FDW option "batch_size" */
 	bool		has_returning;	/* is there a RETURNING clause? */
 	List	   *retrieved_attrs;	/* attr numbers retrieved by RETURNING */
 
@@ -185,6 +192,9 @@ typedef struct PgFdwModifyState
 	int			p_nums;			/* number of parameters to transmit */
 	FmgrInfo   *p_flinfo;		/* output conversion functions for them */
 
+	/* batch operation stuff */
+	int			num_slots;		/* number of slots to insert */
+
 	/* working memory context */
 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
 
@@ -343,6 +353,12 @@ static TupleTableSlot *postgresExecForeignInsert(EState *estate,
 												 ResultRelInfo *resultRelInfo,
 												 TupleTableSlot *slot,
 												 TupleTableSlot *planSlot);
+static TupleTableSlot **postgresExecForeignBatchInsert(EState *estate,
+												 ResultRelInfo *resultRelInfo,
+												 TupleTableSlot **slots,
+												 TupleTableSlot **planSlots,
+												 int *numSlots);
+static int	postgresGetForeignModifyBatchSize(ResultRelInfo *resultRelInfo);
 static TupleTableSlot *postgresExecForeignUpdate(EState *estate,
 												 ResultRelInfo *resultRelInfo,
 												 TupleTableSlot *slot,
@@ -429,20 +445,24 @@ static PgFdwModifyState *create_foreign_modify(EState *estate,
 											   Plan *subplan,
 											   char *query,
 											   List *target_attrs,
+											   int len,
 											   bool has_returning,
 											   List *retrieved_attrs);
-static TupleTableSlot *execute_foreign_modify(EState *estate,
+static TupleTableSlot **execute_foreign_modify(EState *estate,
 											  ResultRelInfo *resultRelInfo,
 											  CmdType operation,
-											  TupleTableSlot *slot,
-											  TupleTableSlot *planSlot);
+											  TupleTableSlot **slots,
+											  TupleTableSlot **planSlots,
+											  int *numSlots);
 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
 											 ItemPointer tupleid,
-											 TupleTableSlot *slot);
+											 TupleTableSlot **slots,
+											 int numSlots);
 static void store_returning_result(PgFdwModifyState *fmstate,
 								   TupleTableSlot *slot, PGresult *res);
 static void finish_foreign_modify(PgFdwModifyState *fmstate);
+static void deallocate_query(PgFdwModifyState *fmstate);
 static List *build_remote_returning(Index rtindex, Relation rel,
 									List *returningList);
 static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist);
@@ -530,6 +550,8 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
 	routine->PlanForeignModify = postgresPlanForeignModify;
 	routine->BeginForeignModify = postgresBeginForeignModify;
 	routine->ExecForeignInsert = postgresExecForeignInsert;
+	routine->ExecForeignBatchInsert = postgresExecForeignBatchInsert;
+	routine->GetForeignModifyBatchSize = postgresGetForeignModifyBatchSize;
 	routine->ExecForeignUpdate = postgresExecForeignUpdate;
 	routine->ExecForeignDelete = postgresExecForeignDelete;
 	routine->EndForeignModify = postgresEndForeignModify;
@@ -1665,6 +1687,7 @@ postgresPlanForeignModify(PlannerInfo *root,
 	List	   *returningList = NIL;
 	List	   *retrieved_attrs = NIL;
 	bool		doNothing = false;
+	int			values_end_len = -1;
 
 	initStringInfo(&sql);
 
@@ -1752,7 +1775,7 @@ postgresPlanForeignModify(PlannerInfo *root,
 			deparseInsertSql(&sql, rte, resultRelation, rel,
 							 targetAttrs, doNothing,
 							 withCheckOptionList, returningList,
-							 &retrieved_attrs);
+							 &retrieved_attrs, &values_end_len);
 			break;
 		case CMD_UPDATE:
 			deparseUpdateSql(&sql, rte, resultRelation, rel,
@@ -1776,8 +1799,9 @@ postgresPlanForeignModify(PlannerInfo *root,
 	 * Build the fdw_private list that will be available to the executor.
 	 * Items in the list must match enum FdwModifyPrivateIndex, above.
 	 */
-	return list_make4(makeString(sql.data),
+	return list_make5(makeString(sql.data),
 					  targetAttrs,
+					  makeInteger(values_end_len),
 					  makeInteger((retrieved_attrs != NIL)),
 					  retrieved_attrs);
 }
@@ -1797,6 +1821,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
 	char	   *query;
 	List	   *target_attrs;
 	bool		has_returning;
+	int			values_end_len;
 	List	   *retrieved_attrs;
 	RangeTblEntry *rte;
 
@@ -1812,6 +1837,8 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
 							FdwModifyPrivateUpdateSql));
 	target_attrs = (List *) list_nth(fdw_private,
 									 FdwModifyPrivateTargetAttnums);
+	values_end_len = intVal(list_nth(fdw_private,
+									FdwModifyPrivateLen));
 	has_returning = intVal(list_nth(fdw_private,
 									FdwModifyPrivateHasReturning));
 	retrieved_attrs = (List *) list_nth(fdw_private,
@@ -1829,6 +1856,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
 									mtstate->mt_plans[subplan_index]->plan,
 									query,
 									target_attrs,
+									values_end_len,
 									has_returning,
 									retrieved_attrs);
 
@@ -1846,7 +1874,8 @@ postgresExecForeignInsert(EState *estate,
 						  TupleTableSlot *planSlot)
 {
 	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
-	TupleTableSlot *rslot;
+	TupleTableSlot **rslot;
+	int 			numSlots = 1;
 
 	/*
 	 * If the fmstate has aux_fmstate set, use the aux_fmstate (see
@@ -1855,7 +1884,36 @@ postgresExecForeignInsert(EState *estate,
 	if (fmstate->aux_fmstate)
 		resultRelInfo->ri_FdwState = fmstate->aux_fmstate;
 	rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT,
-								   slot, planSlot);
+								   &slot, &planSlot, &numSlots);
+	/* Revert that change */
+	if (fmstate->aux_fmstate)
+		resultRelInfo->ri_FdwState = fmstate;
+
+	return rslot ? *rslot : NULL;
+}
+
+/*
+ * postgresExecForeignBatchInsert
+ *		Insert multiple rows into a foreign table
+ */
+static TupleTableSlot **
+postgresExecForeignBatchInsert(EState *estate,
+						  ResultRelInfo *resultRelInfo,
+						  TupleTableSlot **slots,
+						  TupleTableSlot **planSlots,
+						  int *numSlots)
+{
+	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
+	TupleTableSlot **rslot;
+
+	/*
+	 * If the fmstate has aux_fmstate set, use the aux_fmstate (see
+	 * postgresBeginForeignInsert())
+	 */
+	if (fmstate->aux_fmstate)
+		resultRelInfo->ri_FdwState = fmstate->aux_fmstate;
+	rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT,
+								   slots, planSlots, numSlots);
 	/* Revert that change */
 	if (fmstate->aux_fmstate)
 		resultRelInfo->ri_FdwState = fmstate;
@@ -1863,6 +1921,23 @@ postgresExecForeignInsert(EState *estate,
 	return rslot;
 }
 
+/*
+ * postgresGetForeignModifyBatchSize
+ *		Report the maximum number of tuples that can be inserted in bulk
+ */
+static int
+postgresGetForeignModifyBatchSize(ResultRelInfo *resultRelInfo)
+{
+	/* Disable batching when we have to use RETURNING. */
+	if (resultRelInfo->ri_projectReturning != NULL ||
+		(resultRelInfo->ri_TrigDesc &&
+		 resultRelInfo->ri_TrigDesc->trig_insert_after_row))
+		return 1;
+
+	/* Otherwise use the batch size specified for server/table. */
+	return ((PgFdwModifyState *) resultRelInfo->ri_FdwState)->batch_size;
+}
+
 /*
  * postgresExecForeignUpdate
  *		Update one row in a foreign table
@@ -1873,8 +1948,13 @@ postgresExecForeignUpdate(EState *estate,
 						  TupleTableSlot *slot,
 						  TupleTableSlot *planSlot)
 {
-	return execute_foreign_modify(estate, resultRelInfo, CMD_UPDATE,
-								  slot, planSlot);
+	TupleTableSlot **rslot;
+	int 			numSlots = 1;
+
+	rslot = execute_foreign_modify(estate, resultRelInfo, CMD_UPDATE,
+								  &slot, &planSlot, &numSlots);
+
+	return rslot ? rslot[0] : NULL;
 }
 
 /*
@@ -1887,8 +1967,13 @@ postgresExecForeignDelete(EState *estate,
 						  TupleTableSlot *slot,
 						  TupleTableSlot *planSlot)
 {
-	return execute_foreign_modify(estate, resultRelInfo, CMD_DELETE,
-								  slot, planSlot);
+	TupleTableSlot **rslot;
+	int 			numSlots = 1;
+
+	rslot = execute_foreign_modify(estate, resultRelInfo, CMD_DELETE,
+								  &slot, &planSlot, &numSlots);
+
+	return rslot ? rslot[0] : NULL;
 }
 
 /*
@@ -1925,6 +2010,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate,
 	RangeTblEntry *rte;
 	TupleDesc	tupdesc = RelationGetDescr(rel);
 	int			attnum;
+	int			values_end_len;
 	StringInfoData sql;
 	List	   *targetAttrs = NIL;
 	List	   *retrieved_attrs = NIL;
@@ -2001,7 +2087,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate,
 	deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing,
 					 resultRelInfo->ri_WithCheckOptions,
 					 resultRelInfo->ri_returningList,
-					 &retrieved_attrs);
+					 &retrieved_attrs, &values_end_len);
 
 	/* Construct an execution state. */
 	fmstate = create_foreign_modify(mtstate->ps.state,
@@ -2011,6 +2097,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate,
 									NULL,
 									sql.data,
 									targetAttrs,
+									values_end_len,
 									retrieved_attrs != NIL,
 									retrieved_attrs);
 
@@ -2636,6 +2723,9 @@ postgresExplainForeignModify(ModifyTableState *mtstate,
 										  FdwModifyPrivateUpdateSql));
 
 		ExplainPropertyText("Remote SQL", sql, es);
+
+		if (rinfo->ri_BatchSize > 0)
+			ExplainPropertyInteger("Batch Size", NULL, rinfo->ri_BatchSize, es);
 	}
 }
 
@@ -3530,6 +3620,7 @@ create_foreign_modify(EState *estate,
 					  Plan *subplan,
 					  char *query,
 					  List *target_attrs,
+					  int values_end,
 					  bool has_returning,
 					  List *retrieved_attrs)
 {
@@ -3538,6 +3629,7 @@ create_foreign_modify(EState *estate,
 	TupleDesc	tupdesc = RelationGetDescr(rel);
 	Oid			userid;
 	ForeignTable *table;
+	ForeignServer *server;
 	UserMapping *user;
 	AttrNumber	n_params;
 	Oid			typefnoid;
@@ -3564,7 +3656,10 @@ create_foreign_modify(EState *estate,
 
 	/* Set up remote query information. */
 	fmstate->query = query;
+	if (operation == CMD_INSERT)
+		fmstate->orig_query = pstrdup(fmstate->query);
 	fmstate->target_attrs = target_attrs;
+	fmstate->values_end = values_end;
 	fmstate->has_returning = has_returning;
 	fmstate->retrieved_attrs = retrieved_attrs;
 
@@ -3616,6 +3711,44 @@ create_foreign_modify(EState *estate,
 
 	Assert(fmstate->p_nums <= n_params);
 
+	/* Set batch_size from foreign server/table options. */
+	if (operation == CMD_INSERT)
+	{
+		/* Check the foreign table option. */
+		foreach(lc, table->options)
+		{
+			DefElem    *def = (DefElem *) lfirst(lc);
+
+			if (strcmp(def->defname, "batch_size") == 0)
+			{
+				fmstate->batch_size = strtol(defGetString(def), NULL, 10);
+				break;
+			}
+		}
+
+		/* Check the foreign server option if the table option is not set. */
+		if (fmstate->batch_size == 0)
+		{
+			server = GetForeignServer(table->serverid);
+			foreach(lc, server->options)
+			{
+				DefElem    *def = (DefElem *) lfirst(lc);
+
+				if (strcmp(def->defname, "batch_size") == 0)
+				{
+					fmstate->batch_size = strtol(defGetString(def), NULL, 10);
+					break;
+				}
+			}
+		}
+
+		/* If neither the table nor server option is set, set the default. */
+		if (fmstate->batch_size == 0)
+			fmstate->batch_size = 100;
+	}
+
+	fmstate->num_slots = 1;
+
 	/* Initialize auxiliary state */
 	fmstate->aux_fmstate = NULL;
 
@@ -3626,26 +3759,50 @@ create_foreign_modify(EState *estate,
  * execute_foreign_modify
  *		Perform foreign-table modification as required, and fetch RETURNING
  *		result if any.  (This is the shared guts of postgresExecForeignInsert,
- *		postgresExecForeignUpdate, and postgresExecForeignDelete.)
+ *		postgresExecForeignBatchInsert, postgresExecForeignUpdate, and
+ *		postgresExecForeignDelete.)
  */
-static TupleTableSlot *
+static TupleTableSlot **
 execute_foreign_modify(EState *estate,
 					   ResultRelInfo *resultRelInfo,
 					   CmdType operation,
-					   TupleTableSlot *slot,
-					   TupleTableSlot *planSlot)
+					   TupleTableSlot **slots,
+					   TupleTableSlot **planSlots,
+					   int *numSlots)
 {
 	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
 	ItemPointer ctid = NULL;
 	const char **p_values;
 	PGresult   *res;
 	int			n_rows;
+	StringInfoData sql;
 
 	/* The operation should be INSERT, UPDATE, or DELETE */
 	Assert(operation == CMD_INSERT ||
 		   operation == CMD_UPDATE ||
 		   operation == CMD_DELETE);
 
+	/*
+	 * If the existing query was deparsed and prepared for a different number
+	 * of rows, rebuild it for the proper number.
+	 */
+	if (operation == CMD_INSERT && fmstate->num_slots != *numSlots)
+	{
+		/* Destroy the prepared statement created previously */
+		if (fmstate->p_name)
+			deallocate_query(fmstate);
+
+		/*
+		 * Build INSERT string with numSlots records in its VALUES clause.
+		 */
+		initStringInfo(&sql);
+		rebuildInsertSql(&sql, fmstate->orig_query, fmstate->values_end,
+						 fmstate->p_nums, *numSlots - 1);
+		pfree(fmstate->query);
+		fmstate->query = sql.data;
+		fmstate->num_slots = *numSlots;
+	}
+
 	/* Set up the prepared statement on the remote server, if we didn't yet */
 	if (!fmstate->p_name)
 		prepare_foreign_modify(fmstate);
@@ -3658,7 +3815,7 @@ execute_foreign_modify(EState *estate,
 		Datum		datum;
 		bool		isNull;
 
-		datum = ExecGetJunkAttribute(planSlot,
+		datum = ExecGetJunkAttribute(planSlots[0],
 									 fmstate->ctidAttno,
 									 &isNull);
 		/* shouldn't ever get a null result... */
@@ -3668,14 +3825,14 @@ execute_foreign_modify(EState *estate,
 	}
 
 	/* Convert parameters needed by prepared statement to text form */
-	p_values = convert_prep_stmt_params(fmstate, ctid, slot);
+	p_values = convert_prep_stmt_params(fmstate, ctid, slots, *numSlots);
 
 	/*
 	 * Execute the prepared statement.
 	 */
 	if (!PQsendQueryPrepared(fmstate->conn,
 							 fmstate->p_name,
-							 fmstate->p_nums,
+							 fmstate->p_nums * (*numSlots),
 							 p_values,
 							 NULL,
 							 NULL,
@@ -3696,9 +3853,10 @@ execute_foreign_modify(EState *estate,
 	/* Check number of rows affected, and fetch RETURNING tuple if any */
 	if (fmstate->has_returning)
 	{
+		Assert(*numSlots == 1);
 		n_rows = PQntuples(res);
 		if (n_rows > 0)
-			store_returning_result(fmstate, slot, res);
+			store_returning_result(fmstate, slots[0], res);
 	}
 	else
 		n_rows = atoi(PQcmdTuples(res));
@@ -3708,10 +3866,12 @@ execute_foreign_modify(EState *estate,
 
 	MemoryContextReset(fmstate->temp_cxt);
 
+	*numSlots = n_rows;
+
 	/*
 	 * Return NULL if nothing was inserted/updated/deleted on the remote end
 	 */
-	return (n_rows > 0) ? slot : NULL;
+	return (n_rows > 0) ? slots : NULL;
 }
 
 /*
@@ -3771,52 +3931,64 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
 static const char **
 convert_prep_stmt_params(PgFdwModifyState *fmstate,
 						 ItemPointer tupleid,
-						 TupleTableSlot *slot)
+						 TupleTableSlot **slots,
+						 int numSlots)
 {
 	const char **p_values;
+	int			i;
+	int			j;
 	int			pindex = 0;
 	MemoryContext oldcontext;
 
 	oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
 
-	p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
+	p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums * numSlots);
+
+	/* ctid is provided only for UPDATE/DELETE, which don't allow batching */
+	Assert(!(tupleid != NULL && numSlots > 1));
 
 	/* 1st parameter should be ctid, if it's in use */
 	if (tupleid != NULL)
 	{
+		Assert(numSlots == 1);
 		/* don't need set_transmission_modes for TID output */
 		p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
 											  PointerGetDatum(tupleid));
 		pindex++;
 	}
 
-	/* get following parameters from slot */
-	if (slot != NULL && fmstate->target_attrs != NIL)
+	/* get following parameters from slots */
+	if (slots != NULL && fmstate->target_attrs != NIL)
 	{
 		int			nestlevel;
 		ListCell   *lc;
 
 		nestlevel = set_transmission_modes();
 
-		foreach(lc, fmstate->target_attrs)
+		for (i = 0; i < numSlots; i++)
 		{
-			int			attnum = lfirst_int(lc);
-			Datum		value;
-			bool		isnull;
+			j = (tupleid != NULL) ? 1 : 0;
+			foreach(lc, fmstate->target_attrs)
+			{
+				int			attnum = lfirst_int(lc);
+				Datum		value;
+				bool		isnull;
 
-			value = slot_getattr(slot, attnum, &isnull);
-			if (isnull)
-				p_values[pindex] = NULL;
-			else
-				p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
-													  value);
-			pindex++;
+				value = slot_getattr(slots[i], attnum, &isnull);
+				if (isnull)
+					p_values[pindex] = NULL;
+				else
+					p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[j],
+														  value);
+				pindex++;
+				j++;
+			}
 		}
 
 		reset_transmission_modes(nestlevel);
 	}
 
-	Assert(pindex == fmstate->p_nums);
+	Assert(pindex == fmstate->p_nums * numSlots);
 
 	MemoryContextSwitchTo(oldcontext);
 
@@ -3870,29 +4042,41 @@ finish_foreign_modify(PgFdwModifyState *fmstate)
 	Assert(fmstate != NULL);
 
 	/* If we created a prepared statement, destroy it */
-	if (fmstate->p_name)
-	{
-		char		sql[64];
-		PGresult   *res;
-
-		snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
-
-		/*
-		 * We don't use a PG_TRY block here, so be careful not to throw error
-		 * without releasing the PGresult.
-		 */
-		res = pgfdw_exec_query(fmstate->conn, sql);
-		if (PQresultStatus(res) != PGRES_COMMAND_OK)
-			pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
-		PQclear(res);
-		fmstate->p_name = NULL;
-	}
+	deallocate_query(fmstate);
 
 	/* Release remote connection */
 	ReleaseConnection(fmstate->conn);
 	fmstate->conn = NULL;
 }
 
+/*
+ * deallocate_query
+ *		Deallocate a prepared statement for a foreign insert/update/delete
+ *		operation
+ */
+static void
+deallocate_query(PgFdwModifyState *fmstate)
+{
+	char		sql[64];
+	PGresult   *res;
+
+	/* do nothing if the query is not allocated */
+	if (!fmstate->p_name)
+		return;
+
+	snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
+
+	/*
+	 * We don't use a PG_TRY block here, so be careful not to throw error
+	 * without releasing the PGresult.
+	 */
+	res = pgfdw_exec_query(fmstate->conn, sql);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
+	PQclear(res);
+	fmstate->p_name = NULL;
+}
+
 /*
  * build_remote_returning
  *		Build a RETURNING targetlist of a remote query for performing an
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 19ea27a1bc..1f67b4d9fd 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -161,7 +161,10 @@ extern void deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
 							 Index rtindex, Relation rel,
 							 List *targetAttrs, bool doNothing,
 							 List *withCheckOptionList, List *returningList,
-							 List **retrieved_attrs);
+							 List **retrieved_attrs, int *values_end_len);
+extern void rebuildInsertSql(StringInfo buf, char *orig_query,
+							 int values_end_len, int num_cols,
+							 int num_rows);
 extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte,
 							 Index rtindex, Relation rel,
 							 List *targetAttrs,
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 25dbc08b98..fd5abf2471 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -2711,3 +2711,94 @@ SELECT 1 FROM ft1 LIMIT 1;
 ALTER SERVER loopback OPTIONS (ADD use_remote_estimate 'off');
 -- The invalid connection gets closed in pgfdw_xact_callback during commit.
 COMMIT;
+
+-- ===================================================================
+-- batch insert
+-- ===================================================================
+
+BEGIN;
+
+CREATE SERVER batch10 FOREIGN DATA WRAPPER postgres_fdw OPTIONS( batch_size '10' );
+
+SELECT count(*)
+FROM pg_foreign_server
+WHERE srvname = 'batch10'
+AND srvoptions @> array['batch_size=10'];
+
+ALTER SERVER batch10 OPTIONS( SET batch_size '20' );
+
+SELECT count(*)
+FROM pg_foreign_server
+WHERE srvname = 'batch10'
+AND srvoptions @> array['batch_size=10'];
+
+SELECT count(*)
+FROM pg_foreign_server
+WHERE srvname = 'batch10'
+AND srvoptions @> array['batch_size=20'];
+
+CREATE FOREIGN TABLE table30 ( x int ) SERVER batch10 OPTIONS ( batch_size '30' );
+
+SELECT COUNT(*)
+FROM pg_foreign_table
+WHERE ftrelid = 'table30'::regclass
+AND ftoptions @> array['batch_size=30'];
+
+ALTER FOREIGN TABLE table30 OPTIONS ( SET batch_size '40');
+
+SELECT COUNT(*)
+FROM pg_foreign_table
+WHERE ftrelid = 'table30'::regclass
+AND ftoptions @> array['batch_size=30'];
+
+SELECT COUNT(*)
+FROM pg_foreign_table
+WHERE ftrelid = 'table30'::regclass
+AND ftoptions @> array['batch_size=40'];
+
+ROLLBACK;
+
+CREATE TABLE batch_table ( x int );
+
+CREATE FOREIGN TABLE ftable ( x int ) SERVER loopback OPTIONS ( table_name 'batch_table', batch_size '10' );
+INSERT INTO ftable SELECT * FROM generate_series(1, 10) i;
+INSERT INTO ftable SELECT * FROM generate_series(11, 31) i;
+INSERT INTO ftable VALUES (32);
+INSERT INTO ftable VALUES (33), (34);
+SELECT COUNT(*) FROM ftable;
+TRUNCATE batch_table;
+DROP FOREIGN TABLE ftable;
+
+-- Disable batch insert
+CREATE FOREIGN TABLE ftable ( x int ) SERVER loopback OPTIONS ( table_name 'batch_table', batch_size '1' );
+INSERT INTO ftable VALUES (1), (2);
+SELECT COUNT(*) FROM ftable;
+DROP FOREIGN TABLE ftable;
+DROP TABLE batch_table;
+
+-- Use partitioning
+CREATE TABLE batch_table ( x int ) PARTITION BY HASH (x);
+
+CREATE TABLE batch_table_p0 (LIKE batch_table);
+CREATE FOREIGN TABLE batch_table_p0f
+	PARTITION OF batch_table
+	FOR VALUES WITH (MODULUS 3, REMAINDER 0)
+	SERVER loopback
+	OPTIONS (table_name 'batch_table_p0', batch_size '10');
+
+CREATE TABLE batch_table_p1 (LIKE batch_table);
+CREATE FOREIGN TABLE batch_table_p1f
+	PARTITION OF batch_table
+	FOR VALUES WITH (MODULUS 3, REMAINDER 1)
+	SERVER loopback
+	OPTIONS (table_name 'batch_table_p1', batch_size '1');
+
+CREATE TABLE batch_table_p2
+	PARTITION OF batch_table
+	FOR VALUES WITH (MODULUS 3, REMAINDER 2);
+
+INSERT INTO batch_table SELECT * FROM generate_series(1, 66) i;
+SELECT COUNT(*) FROM batch_table;
+
+-- Clean up
+DROP TABLE batch_table CASCADE;
diff --git a/doc/src/sgml/fdwhandler.sgml b/doc/src/sgml/fdwhandler.sgml
index 9c9293414c..854913ae5f 100644
--- a/doc/src/sgml/fdwhandler.sgml
+++ b/doc/src/sgml/fdwhandler.sgml
@@ -523,8 +523,9 @@ BeginForeignModify(ModifyTableState *mtstate,
      Begin executing a foreign table modification operation.  This routine is
      called during executor startup.  It should perform any initialization
      needed prior to the actual table modifications.  Subsequently,
-     <function>ExecForeignInsert</function>, <function>ExecForeignUpdate</function> or
-     <function>ExecForeignDelete</function> will be called for each tuple to be
+     <function>ExecForeignInsert/ExecForeignBatchInsert</function>,
+     <function>ExecForeignUpdate</function> or
+     <function>ExecForeignDelete</function> will be called for tuple(s) to be
      inserted, updated, or deleted.
     </para>
 
@@ -614,6 +615,81 @@ ExecForeignInsert(EState *estate,
 
     <para>
 <programlisting>
+TupleTableSlot **
+ExecForeignBatchInsert(EState *estate,
+                  ResultRelInfo *rinfo,
+                  TupleTableSlot **slots,
+                  TupleTableSlot *planSlots,
+                  int *numSlots);
+</programlisting>
+
+     Insert multiple tuples in bulk into the foreign table.
+     The parameters are the same for <function>ExecForeignInsert</function>
+     except <literal>slots</literal> and <literal>planSlots</literal> contain
+     multiple tuples and <literal>*numSlots></literal> specifies the number of
+     tuples in those arrays.
+    </para>
+
+    <para>
+     The return value is an array of slots containing the data that was
+     actually inserted (this might differ from the data supplied, for
+     example as a result of trigger actions.)
+     The passed-in <literal>slots</literal> can be re-used for this purpose.
+     The number of successfully inserted tuples is returned in
+     <literal>*numSlots</literal>.
+    </para>
+
+    <para>
+     The data in the returned slot is used only if the <command>INSERT</command>
+     statement involves a view
+     <literal>WITH CHECK OPTION</literal>; or if the foreign table has
+     an <literal>AFTER ROW</literal> trigger.  Triggers require all columns,
+     but the FDW could choose to optimize away returning some or all columns
+     depending on the contents of the
+     <literal>WITH CHECK OPTION</literal> constraints.
+    </para>
+
+    <para>
+     If the <function>ExecForeignBatchInsert</function> or
+     <function>GetForeignModifyBatchSize</function> pointer is set to
+     <literal>NULL</literal>, attempts to insert into the foreign table will
+     use <function>ExecForeignInsert</function>.
+     This function is not used if the <command>INSERT</command> has the
+     <literal>RETURNING></literal> clause.
+    </para>
+
+    <para>
+     Note that this function is also called when inserting routed tuples into
+     a foreign-table partition.  See the callback functions
+     described below that allow the FDW to support that.
+    </para>
+
+    <para>
+<programlisting>
+int
+GetForeignModifyBatchSize(ResultRelInfo *rinfo);
+</programlisting>
+
+     Report the maximum number of tuples that a single
+     <function>ExecForeignBatchInsert</function> call can handle for
+     the specified foreign table.  That is, The executor passes at most
+     the number of tuples that this function returns to
+     <function>ExecForeignBatchInsert</function>.
+     <literal>rinfo</literal> is the <structname>ResultRelInfo</structname> struct describing
+     the target foreign table.
+     The FDW is expected to provide a foreign server and/or foreign
+     table option for the user to set this value, or some hard-coded value.
+    </para>
+
+    <para>
+     If the <function>ExecForeignBatchInsert</function> or
+     <function>GetForeignModifyBatchSize</function> pointer is set to
+     <literal>NULL</literal>, attempts to insert into the foreign table will
+     use <function>ExecForeignInsert</function>.
+    </para>
+
+    <para>
+<programlisting>
 TupleTableSlot *
 ExecForeignUpdate(EState *estate,
                   ResultRelInfo *rinfo,
@@ -741,8 +817,9 @@ BeginForeignInsert(ModifyTableState *mtstate,
      in both cases when it is the partition chosen for tuple routing and the
      target specified in a <command>COPY FROM</command> command.  It should
      perform any initialization needed prior to the actual insertion.
-     Subsequently, <function>ExecForeignInsert</function> will be called for
-     each tuple to be inserted into the foreign table.
+     Subsequently, <function>ExecForeignInsert</function> or
+     <function>ExecForeignBatchInsert</function> will be called for
+     tuple(s) to be inserted into the foreign table.
     </para>
 
     <para>
@@ -773,8 +850,8 @@ BeginForeignInsert(ModifyTableState *mtstate,
     <para>
      Note that if the FDW does not support routable foreign-table partitions
      and/or executing <command>COPY FROM</command> on foreign tables, this
-     function or <function>ExecForeignInsert</function> subsequently called
-     must throw error as needed.
+     function or <function>ExecForeignInsert/ExecForeignBatchInsert</function>
+     subsequently called must throw error as needed.
     </para>
 
     <para>
diff --git a/doc/src/sgml/postgres-fdw.sgml b/doc/src/sgml/postgres-fdw.sgml
index e6fd2143c1..97eeb64a02 100644
--- a/doc/src/sgml/postgres-fdw.sgml
+++ b/doc/src/sgml/postgres-fdw.sgml
@@ -354,6 +354,19 @@ OPTIONS (ADD password_required 'false');
      </listitem>
     </varlistentry>
 
+    <varlistentry>
+     <term><literal>batch_size</literal></term>
+     <listitem>
+      <para>
+       This option specifies the number of rows <filename>postgres_fdw</filename>
+       should insert in each insert operation. It can be specified for a
+       foreign table or a foreign server. The option specified on a table
+       overrides an option specified for the server.
+       The default is <literal>100</literal>.
+      </para>
+     </listitem>
+    </varlistentry>
+
    </variablelist>
 
   </sect3>
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 921e695419..fc48d8cb75 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -58,6 +58,13 @@
 #include "utils/rel.h"
 
 
+static void ExecBatchInsert(ModifyTableState *mtstate,
+								 ResultRelInfo *resultRelInfo,
+								 TupleTableSlot **slots,
+								 TupleTableSlot **planSlots,
+								 int numSlots,
+								 EState *estate,
+								 bool canSetTag);
 static bool ExecOnConflictUpdate(ModifyTableState *mtstate,
 								 ResultRelInfo *resultRelInfo,
 								 ItemPointer conflictTid,
@@ -389,6 +396,7 @@ ExecInsert(ModifyTableState *mtstate,
 	ModifyTable *node = (ModifyTable *) mtstate->ps.plan;
 	OnConflictAction onconflict = node->onConflictAction;
 	PartitionTupleRouting *proute = mtstate->mt_partition_tuple_routing;
+	MemoryContext oldContext;
 
 	/*
 	 * If the input result relation is a partitioned table, find the leaf
@@ -441,6 +449,72 @@ ExecInsert(ModifyTableState *mtstate,
 			ExecComputeStoredGenerated(resultRelInfo, estate, slot,
 									   CMD_INSERT);
 
+		/*
+		 * Determine if the FDW supports batch insert and determine the batch
+		 * size (a FDW may support batching, but it may be disabled for the
+		 * server/table). Do this only once, at the beginning - we don't want
+		 * the batch size to change during execution.
+		 */
+		if (resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize &&
+			resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert &&
+			resultRelInfo->ri_BatchSize == 0)
+			resultRelInfo->ri_BatchSize =
+				resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(resultRelInfo);
+
+		if (resultRelInfo->ri_BatchSize == 0)
+			resultRelInfo->ri_BatchSize = 1;
+
+		Assert(resultRelInfo->ri_BatchSize >= 0);
+
+		/*
+		 * If the FDW supports batching, and batching is requested, accumulate
+		 * rows and insert them in batches. Otherwise use the per-row inserts.
+		 */
+		if (resultRelInfo->ri_BatchSize > 1)
+		{
+			/*
+			 * If a certain number of tuples have already been accumulated,
+			 * or a tuple has come for a different relation than that for
+			 * the accumulated tuples, perform the batch insert
+			 */
+			if (resultRelInfo->ri_NumSlots == resultRelInfo->ri_BatchSize)
+			{
+				ExecBatchInsert(mtstate, resultRelInfo,
+							   resultRelInfo->ri_Slots,
+							   resultRelInfo->ri_PlanSlots,
+							   resultRelInfo->ri_NumSlots,
+							   estate, canSetTag);
+				resultRelInfo->ri_NumSlots = 0;
+			}
+
+			oldContext = MemoryContextSwitchTo(estate->es_query_cxt);
+
+			if (resultRelInfo->ri_Slots == NULL)
+			{
+				resultRelInfo->ri_Slots = palloc(sizeof(TupleTableSlot *) *
+										   resultRelInfo->ri_BatchSize);
+				resultRelInfo->ri_PlanSlots = palloc(sizeof(TupleTableSlot *) *
+										   resultRelInfo->ri_BatchSize);
+			}
+
+			resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots] =
+				MakeSingleTupleTableSlot(slot->tts_tupleDescriptor,
+										 slot->tts_ops);
+			ExecCopySlot(resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots],
+						 slot);
+			resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots] =
+				MakeSingleTupleTableSlot(planSlot->tts_tupleDescriptor,
+										 planSlot->tts_ops);
+			ExecCopySlot(resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots],
+						 planSlot);
+
+			resultRelInfo->ri_NumSlots++;
+
+			MemoryContextSwitchTo(oldContext);
+
+			return NULL;
+		}
+
 		/*
 		 * insert into foreign table: let the FDW do it
 		 */
@@ -698,6 +772,70 @@ ExecInsert(ModifyTableState *mtstate,
 	return result;
 }
 
+/* ----------------------------------------------------------------
+ *		ExecBatchInsert
+ *
+ *		Insert multiple tuples in an efficient way.
+ *		Currently, this handles inserting into a foreign table without
+ *		RETURNING clause.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecBatchInsert(ModifyTableState *mtstate,
+		   ResultRelInfo *resultRelInfo,
+		   TupleTableSlot **slots,
+		   TupleTableSlot **planSlots,
+		   int numSlots,
+		   EState *estate,
+		   bool canSetTag)
+{
+	int			i;
+	int			numInserted = numSlots;
+	TupleTableSlot *slot = NULL;
+	TupleTableSlot **rslots;
+
+	/*
+	 * insert into foreign table: let the FDW do it
+	 */
+	rslots = resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert(estate,
+																 resultRelInfo,
+																 slots,
+																 planSlots,
+																 &numInserted);
+
+	for (i = 0; i < numInserted; i++)
+	{
+		slot = rslots[i];
+
+		/*
+		 * AFTER ROW Triggers or RETURNING expressions might reference the
+		 * tableoid column, so (re-)initialize tts_tableOid before evaluating
+		 * them.
+		 */
+		slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+
+		/* AFTER ROW INSERT Triggers */
+		ExecARInsertTriggers(estate, resultRelInfo, slot, NIL,
+							 mtstate->mt_transition_capture);
+
+		/*
+		 * Check any WITH CHECK OPTION constraints from parent views.  See the
+		 * comment in ExecInsert.
+		 */
+		if (resultRelInfo->ri_WithCheckOptions != NIL)
+			ExecWithCheckOptions(WCO_VIEW_CHECK, resultRelInfo, slot, estate);
+	}
+
+	if (canSetTag && numInserted > 0)
+		estate->es_processed += numInserted;
+
+	for (i = 0; i < numSlots; i++)
+	{
+		ExecDropSingleTupleTableSlot(slots[i]);
+		ExecDropSingleTupleTableSlot(planSlots[i]);
+	}
+}
+
 /* ----------------------------------------------------------------
  *		ExecDelete
  *
@@ -1937,6 +2075,9 @@ ExecModifyTable(PlanState *pstate)
 	ItemPointerData tuple_ctid;
 	HeapTupleData oldtupdata;
 	HeapTuple	oldtuple;
+	PartitionTupleRouting *proute = node->mt_partition_tuple_routing;
+	List				  *relinfos = NIL;
+	ListCell			  *lc;
 
 	CHECK_FOR_INTERRUPTS();
 
@@ -2152,6 +2293,25 @@ ExecModifyTable(PlanState *pstate)
 			return slot;
 	}
 
+	/*
+	 * Insert remaining tuples for batch insert.
+	 */
+	if (proute)
+		relinfos = estate->es_tuple_routing_result_relations;
+	else
+		relinfos = estate->es_opened_result_relations;
+
+	foreach(lc, relinfos)
+	{
+		resultRelInfo = lfirst(lc);
+		if (resultRelInfo->ri_NumSlots > 0)
+			ExecBatchInsert(node, resultRelInfo,
+						   resultRelInfo->ri_Slots,
+						   resultRelInfo->ri_PlanSlots,
+						   resultRelInfo->ri_NumSlots,
+						   estate, node->canSetTag);
+	}
+
 	/*
 	 * We're done, but fire AFTER STATEMENT triggers before exiting.
 	 */
diff --git a/src/backend/nodes/list.c b/src/backend/nodes/list.c
index c4eba6b053..dbf6b30233 100644
--- a/src/backend/nodes/list.c
+++ b/src/backend/nodes/list.c
@@ -277,6 +277,21 @@ list_make4_impl(NodeTag t, ListCell datum1, ListCell datum2,
 	return list;
 }
 
+List *
+list_make5_impl(NodeTag t, ListCell datum1, ListCell datum2,
+				ListCell datum3, ListCell datum4, ListCell datum5)
+{
+	List	   *list = new_list(t, 5);
+
+	list->elements[0] = datum1;
+	list->elements[1] = datum2;
+	list->elements[2] = datum3;
+	list->elements[3] = datum4;
+	list->elements[4] = datum5;
+	check_list_invariants(list);
+	return list;
+}
+
 /*
  * Make room for a new head cell in the given (non-NIL) list.
  *
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index 2953499fb1..248f78da45 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -85,6 +85,14 @@ typedef TupleTableSlot *(*ExecForeignInsert_function) (EState *estate,
 													   TupleTableSlot *slot,
 													   TupleTableSlot *planSlot);
 
+typedef TupleTableSlot **(*ExecForeignBatchInsert_function) (EState *estate,
+													   ResultRelInfo *rinfo,
+													   TupleTableSlot **slots,
+													   TupleTableSlot **planSlots,
+													   int *numSlots);
+
+typedef int (*GetForeignModifyBatchSize_function) (ResultRelInfo *rinfo);
+
 typedef TupleTableSlot *(*ExecForeignUpdate_function) (EState *estate,
 													   ResultRelInfo *rinfo,
 													   TupleTableSlot *slot,
@@ -209,6 +217,8 @@ typedef struct FdwRoutine
 	PlanForeignModify_function PlanForeignModify;
 	BeginForeignModify_function BeginForeignModify;
 	ExecForeignInsert_function ExecForeignInsert;
+	ExecForeignBatchInsert_function ExecForeignBatchInsert;
+	GetForeignModifyBatchSize_function GetForeignModifyBatchSize;
 	ExecForeignUpdate_function ExecForeignUpdate;
 	ExecForeignDelete_function ExecForeignDelete;
 	EndForeignModify_function EndForeignModify;
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 48c3f570fa..d65099c94a 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -446,6 +446,12 @@ typedef struct ResultRelInfo
 	/* true when modifying foreign table directly */
 	bool		ri_usesFdwDirectModify;
 
+	/* batch insert stuff */
+	int			ri_NumSlots;		/* number of slots in the array */
+	int			ri_BatchSize;		/* max slots inserted in a single batch */
+	TupleTableSlot **ri_Slots;		/* input tuples for batch insert */
+	TupleTableSlot **ri_PlanSlots;
+
 	/* list of WithCheckOption's to be checked */
 	List	   *ri_WithCheckOptions;
 
diff --git a/src/include/nodes/pg_list.h b/src/include/nodes/pg_list.h
index 710dcd37ef..404e03f132 100644
--- a/src/include/nodes/pg_list.h
+++ b/src/include/nodes/pg_list.h
@@ -213,6 +213,10 @@ list_length(const List *l)
 #define list_make4(x1,x2,x3,x4) \
 	list_make4_impl(T_List, list_make_ptr_cell(x1), list_make_ptr_cell(x2), \
 					list_make_ptr_cell(x3), list_make_ptr_cell(x4))
+#define list_make5(x1,x2,x3,x4,x5) \
+	list_make5_impl(T_List, list_make_ptr_cell(x1), list_make_ptr_cell(x2), \
+					list_make_ptr_cell(x3), list_make_ptr_cell(x4), \
+					list_make_ptr_cell(x5))
 
 #define list_make1_int(x1) \
 	list_make1_impl(T_IntList, list_make_int_cell(x1))
@@ -224,6 +228,10 @@ list_length(const List *l)
 #define list_make4_int(x1,x2,x3,x4) \
 	list_make4_impl(T_IntList, list_make_int_cell(x1), list_make_int_cell(x2), \
 					list_make_int_cell(x3), list_make_int_cell(x4))
+#define list_make5_int(x1,x2,x3,x4,x5) \
+	list_make5_impl(T_IntList, list_make_int_cell(x1), list_make_int_cell(x2), \
+					list_make_int_cell(x3), list_make_int_cell(x4), \
+					list_make_int_cell(x5))
 
 #define list_make1_oid(x1) \
 	list_make1_impl(T_OidList, list_make_oid_cell(x1))
@@ -235,6 +243,10 @@ list_length(const List *l)
 #define list_make4_oid(x1,x2,x3,x4) \
 	list_make4_impl(T_OidList, list_make_oid_cell(x1), list_make_oid_cell(x2), \
 					list_make_oid_cell(x3), list_make_oid_cell(x4))
+#define list_make5_oid(x1,x2,x3,x4,x5) \
+	list_make5_impl(T_OidList, list_make_oid_cell(x1), list_make_oid_cell(x2), \
+					list_make_oid_cell(x3), list_make_oid_cell(x4), \
+					list_make_oid_cell(x5))
 
 /*
  * Locate the n'th cell (counting from 0) of the list.
@@ -520,6 +532,9 @@ extern List *list_make3_impl(NodeTag t, ListCell datum1, ListCell datum2,
 							 ListCell datum3);
 extern List *list_make4_impl(NodeTag t, ListCell datum1, ListCell datum2,
 							 ListCell datum3, ListCell datum4);
+extern List *list_make5_impl(NodeTag t, ListCell datum1, ListCell datum2,
+							 ListCell datum3, ListCell datum4,
+							 ListCell datum5);
 
 extern pg_nodiscard List *lappend(List *list, void *datum);
 extern pg_nodiscard List *lappend_int(List *list, int datum);
-- 
2.26.2

Reply via email to