diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 7bf35602b0..32c2eacf7a 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -8533,6 +8533,7 @@ drop table loct2;
 -- ===================================================================
 -- test COPY FROM
 -- ===================================================================
+alter server loopback options (add batch_size '2');
 create table loc2 (f1 int, f2 text);
 alter table loc2 set (autovacuum_enabled = 'false');
 create foreign table rem2 (f1 int, f2 text) server loopback options(table_name 'loc2');
@@ -8555,7 +8556,7 @@ copy rem2 from stdin; -- ERROR
 ERROR:  new row for relation "loc2" violates check constraint "loc2_f1positive"
 DETAIL:  Failing row contains (-1, xyzzy).
 CONTEXT:  remote SQL command: INSERT INTO public.loc2(f1, f2) VALUES ($1, $2)
-COPY rem2, line 1: "-1	xyzzy"
+COPY rem2
 select * from rem2;
  f1 | f2  
 ----+-----
@@ -8566,6 +8567,19 @@ select * from rem2;
 alter foreign table rem2 drop constraint rem2_f1positive;
 alter table loc2 drop constraint loc2_f1positive;
 delete from rem2;
+create table foo (a int) partition by list (a);
+create table foo1 (like foo);
+create foreign table ffoo1 partition of foo for values in (1)
+	server loopback options (table_name 'foo1');
+create table foo2 (like foo);
+create foreign table ffoo2 partition of foo for values in (2)
+	server loopback options (table_name 'foo2');
+create function print_new_row() returns trigger language plpgsql as $$
+	begin raise notice '%', new; return new; end; $$;
+create trigger ffoo1_br_trig before insert on ffoo1
+	for each row execute function print_new_row();
+copy foo from stdin;
+NOTICE:  (1)
 -- Test local triggers
 create trigger trig_stmt_before before insert on rem2
 	for each statement execute procedure trigger_func();
@@ -8674,6 +8688,34 @@ drop trigger rem2_trig_row_before on rem2;
 drop trigger rem2_trig_row_after on rem2;
 drop trigger loc2_trig_row_before_insert on loc2;
 delete from rem2;
+alter table loc2 drop column f1;
+alter table loc2 drop column f2;
+copy rem2 from stdin;
+ERROR:  column "f1" of relation "loc2" does not exist
+CONTEXT:  remote SQL command: INSERT INTO public.loc2(f1, f2) VALUES ($1, $2), ($3, $4)
+COPY rem2
+alter table loc2 add column f1 int;
+alter table loc2 add column f2 int;
+select * from rem2;
+ f1 | f2 
+----+----
+(0 rows)
+
+-- dropped columns locally and on the foreign server
+alter table rem2 drop column f1;
+alter table rem2 drop column f2;
+copy rem2 from stdin;
+select * from rem2;
+--
+(2 rows)
+
+alter table loc2 drop column f1;
+alter table loc2 drop column f2;
+copy rem2 from stdin;
+select * from rem2;
+--
+(4 rows)
+
 -- test COPY FROM with foreign table created in the same transaction
 create table loc3 (f1 int, f2 text);
 begin;
@@ -8690,6 +8732,7 @@ select * from rem3;
 
 drop foreign table rem3;
 drop table loc3;
+alter server loopback options (drop batch_size);
 -- ===================================================================
 -- test for TRUNCATE
 -- ===================================================================
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 16320170ce..7d18c70f1a 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -2059,6 +2059,15 @@ postgresGetForeignModifyBatchSize(ResultRelInfo *resultRelInfo)
 		  resultRelInfo->ri_TrigDesc->trig_insert_after_row)))
 		return 1;
 
+	/*
+	 * If the foreign table has no columns, disable batching as the INSERT
+	 * syntax doesn't allow batching multiple empty rows into a zero-column
+	 * table.  This isn't needed in case of INSERT, but is in case of COPY.
+	 * Note that in the latter case fmstate must be non-NULL.
+	 */
+	if (fmstate && list_length(fmstate->target_attrs) == 0)
+		return 1;
+
 	/*
 	 * Otherwise use the batch size specified for server/table. The number of
 	 * parameters in a batch is limited to 65535 (uint16), so make sure we
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 42735ae78a..b0743098d4 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -2368,6 +2368,7 @@ drop table loct2;
 -- test COPY FROM
 -- ===================================================================
 
+alter server loopback options (add batch_size '2');
 create table loc2 (f1 int, f2 text);
 alter table loc2 set (autovacuum_enabled = 'false');
 create foreign table rem2 (f1 int, f2 text) server loopback options(table_name 'loc2');
@@ -2400,6 +2401,23 @@ alter table loc2 drop constraint loc2_f1positive;
 
 delete from rem2;
 
+create table foo (a int) partition by list (a);
+create table foo1 (like foo);
+create foreign table ffoo1 partition of foo for values in (1)
+	server loopback options (table_name 'foo1');
+create table foo2 (like foo);
+create foreign table ffoo2 partition of foo for values in (2)
+	server loopback options (table_name 'foo2');
+create function print_new_row() returns trigger language plpgsql as $$
+	begin raise notice '%', new; return new; end; $$;
+create trigger ffoo1_br_trig before insert on ffoo1
+	for each row execute function print_new_row();
+
+copy foo from stdin;
+1
+2
+\.
+
 -- Test local triggers
 create trigger trig_stmt_before before insert on rem2
 	for each statement execute procedure trigger_func();
@@ -2500,6 +2518,34 @@ drop trigger loc2_trig_row_before_insert on loc2;
 
 delete from rem2;
 
+alter table loc2 drop column f1;
+alter table loc2 drop column f2;
+copy rem2 from stdin;
+1	foo
+2	bar
+\.
+
+alter table loc2 add column f1 int;
+alter table loc2 add column f2 int;
+select * from rem2;
+
+-- dropped columns locally and on the foreign server
+alter table rem2 drop column f1;
+alter table rem2 drop column f2;
+copy rem2 from stdin;
+
+
+\.
+select * from rem2;
+
+alter table loc2 drop column f1;
+alter table loc2 drop column f2;
+copy rem2 from stdin;
+
+
+\.
+select * from rem2;
+
 -- test COPY FROM with foreign table created in the same transaction
 create table loc3 (f1 int, f2 text);
 begin;
@@ -2513,6 +2559,7 @@ commit;
 select * from rem3;
 drop foreign table rem3;
 drop table loc3;
+alter server loopback options (drop batch_size);
 
 -- ===================================================================
 -- test for TRUNCATE
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index a976008b3d..ced3b53191 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -116,6 +116,13 @@ CopyFromErrorCallback(void *arg)
 {
 	CopyFromState cstate = (CopyFromState) arg;
 
+	if (cstate->relname_only)
+	{
+		errcontext("COPY %s",
+				   cstate->cur_relname);
+		return;
+	}
+
 	if (cstate->opts.binary)
 	{
 		/* can't usefully display the data */
@@ -222,7 +229,7 @@ CopyMultiInsertBufferInit(ResultRelInfo *rri)
 	buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
 	memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
 	buffer->resultRelInfo = rri;
-	buffer->bistate = GetBulkInsertState();
+	buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL;
 	buffer->nused = 0;
 
 	return buffer;
@@ -299,83 +306,162 @@ CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
  */
 static inline void
 CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
-						   CopyMultiInsertBuffer *buffer)
+						   CopyMultiInsertBuffer *buffer,
+						   int64 *processed)
 {
-	MemoryContext oldcontext;
-	int			i;
-	uint64		save_cur_lineno;
 	CopyFromState cstate = miinfo->cstate;
 	EState	   *estate = miinfo->estate;
-	CommandId	mycid = miinfo->mycid;
-	int			ti_options = miinfo->ti_options;
-	bool		line_buf_valid = cstate->line_buf_valid;
 	int			nused = buffer->nused;
 	ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
 	TupleTableSlot **slots = buffer->slots;
+	int			i;
 
-	/*
-	 * Print error context information correctly, if one of the operations
-	 * below fails.
-	 */
-	cstate->line_buf_valid = false;
-	save_cur_lineno = cstate->cur_lineno;
+	if (resultRelInfo->ri_FdwRoutine)
+	{
+		int			batch_size = resultRelInfo->ri_BatchSize;
+		int			sent = 0;
 
-	/*
-	 * table_multi_insert may leak memory, so switch to short-lived memory
-	 * context before calling it.
-	 */
-	oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-	table_multi_insert(resultRelInfo->ri_RelationDesc,
-					   slots,
-					   nused,
-					   mycid,
-					   ti_options,
-					   buffer->bistate);
-	MemoryContextSwitchTo(oldcontext);
+		Assert(batch_size > 1);
 
-	for (i = 0; i < nused; i++)
-	{
 		/*
-		 * If there are any indexes, update them for all the inserted tuples,
-		 * and run AFTER ROW INSERT triggers.
+		 * We suppress error context information other than the relation name,
+		 * if one of the operations below fails.
 		 */
-		if (resultRelInfo->ri_NumIndices > 0)
+		Assert(!cstate->relname_only);
+		cstate->relname_only = true;
+
+		while (sent < nused)
 		{
-			List	   *recheckIndexes;
-
-			cstate->cur_lineno = buffer->linenos[i];
-			recheckIndexes =
-				ExecInsertIndexTuples(resultRelInfo,
-									  buffer->slots[i], estate, false, false,
-									  NULL, NIL);
-			ExecARInsertTriggers(estate, resultRelInfo,
-								 slots[i], recheckIndexes,
-								 cstate->transition_capture);
-			list_free(recheckIndexes);
+			int			size = (batch_size < nused - sent) ? batch_size : (nused - sent);
+			int			inserted = size;
+			TupleTableSlot **rslots;
+
+			/* Batch insert into foreign table */
+			Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert);
+			rslots =
+				resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert(estate,
+																	 resultRelInfo,
+																	 &slots[sent],
+																	 NULL,
+																	 &inserted);
+
+			/* If any rows were inserted, run AFTER ROW INSERT triggers. */
+			if (inserted > 0 &&
+				resultRelInfo->ri_TrigDesc != NULL &&
+				(resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+				 resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+			{
+				for (i = 0; i < inserted; i++)
+				{
+					TupleTableSlot *slot = rslots[i];
+
+					/*
+					 * AFTER ROW Triggers might reference the tableoid column,
+					 * so (re-)initialize tts_tableOid before evaluating them.
+					 */
+					slot->tts_tableOid =
+						RelationGetRelid(resultRelInfo->ri_RelationDesc);
+
+					ExecARInsertTriggers(estate, resultRelInfo,
+										 slot, NIL,
+										 cstate->transition_capture);
+				}
+			}
+
+			sent += size;
+
+			/* Update the row counter and progress of the COPY command */
+			if (inserted > 0)
+			{
+				*processed += inserted;
+				pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
+											 *processed);
+			}
 		}
 
+		for (i = 0; i < nused; i++)
+			ExecClearTuple(slots[i]);
+
+		/* reset relname_only */
+		cstate->relname_only = false;
+	}
+	else
+	{
+		CommandId	mycid = miinfo->mycid;
+		int			ti_options = miinfo->ti_options;
+		bool		line_buf_valid = cstate->line_buf_valid;
+		uint64		save_cur_lineno = cstate->cur_lineno;
+		MemoryContext oldcontext;
+
 		/*
-		 * There's no indexes, but see if we need to run AFTER ROW INSERT
-		 * triggers anyway.
+		 * Print error context information correctly, if one of the operations
+		 * below fails.
 		 */
-		else if (resultRelInfo->ri_TrigDesc != NULL &&
-				 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
-				  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+		cstate->line_buf_valid = false;
+
+		/*
+		 * table_multi_insert may leak memory, so switch to short-lived memory
+		 * context before calling it.
+		 */
+		oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+		table_multi_insert(resultRelInfo->ri_RelationDesc,
+						   slots,
+						   nused,
+						   mycid,
+						   ti_options,
+						   buffer->bistate);
+		MemoryContextSwitchTo(oldcontext);
+
+		for (i = 0; i < nused; i++)
 		{
-			cstate->cur_lineno = buffer->linenos[i];
-			ExecARInsertTriggers(estate, resultRelInfo,
-								 slots[i], NIL, cstate->transition_capture);
+			/*
+			 * If there are any indexes, update them for all the inserted
+			 * tuples, and run AFTER ROW INSERT triggers.
+			 */
+			if (resultRelInfo->ri_NumIndices > 0)
+			{
+				List	   *recheckIndexes;
+
+				cstate->cur_lineno = buffer->linenos[i];
+				recheckIndexes =
+					ExecInsertIndexTuples(resultRelInfo,
+										  buffer->slots[i], estate, false,
+										  false, NULL, NIL);
+				ExecARInsertTriggers(estate, resultRelInfo,
+									 slots[i], recheckIndexes,
+									 cstate->transition_capture);
+				list_free(recheckIndexes);
+			}
+
+			/*
+			 * There's no indexes, but see if we need to run AFTER ROW INSERT
+			 * triggers anyway.
+			 */
+			else if (resultRelInfo->ri_TrigDesc != NULL &&
+					 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+					  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+			{
+				cstate->cur_lineno = buffer->linenos[i];
+				ExecARInsertTriggers(estate, resultRelInfo,
+									 slots[i], NIL,
+									 cstate->transition_capture);
+			}
+
+			ExecClearTuple(slots[i]);
 		}
 
-		ExecClearTuple(slots[i]);
+		/* Update the row counter and progress of the COPY command */
+		*processed += nused;
+		pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
+									 *processed);
+
+		/* reset cur_lineno and line_buf_valid to what they were */
+		cstate->line_buf_valid = line_buf_valid;
+		cstate->cur_lineno = save_cur_lineno;
 	}
 
 	/* Mark that all slots are free */
 	buffer->nused = 0;
-
-	/* reset cur_lineno and line_buf_valid to what they were */
-	cstate->line_buf_valid = line_buf_valid;
-	cstate->cur_lineno = save_cur_lineno;
 }
 
 /*
@@ -387,22 +473,25 @@ static inline void
 CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
 							 CopyMultiInsertBuffer *buffer)
 {
+	ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
 	int			i;
 
 	/* Ensure buffer was flushed */
 	Assert(buffer->nused == 0);
 
 	/* Remove back-link to ourself */
-	buffer->resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
+	resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
 
-	FreeBulkInsertState(buffer->bistate);
+	if (resultRelInfo->ri_FdwRoutine == NULL)
+		FreeBulkInsertState(buffer->bistate);
 
 	/* Since we only create slots on demand, just drop the non-null ones. */
 	for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
 		ExecDropSingleTupleTableSlot(buffer->slots[i]);
 
-	table_finish_bulk_insert(buffer->resultRelInfo->ri_RelationDesc,
-							 miinfo->ti_options);
+	if (resultRelInfo->ri_FdwRoutine == NULL)
+		table_finish_bulk_insert(resultRelInfo->ri_RelationDesc,
+								 miinfo->ti_options);
 
 	pfree(buffer);
 }
@@ -418,7 +507,8 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
  * 'curr_rri'.
  */
 static inline void
-CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri)
+CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri,
+						 int64 *processed)
 {
 	ListCell   *lc;
 
@@ -426,7 +516,7 @@ CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri)
 	{
 		CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc);
 
-		CopyMultiInsertBufferFlush(miinfo, buffer);
+		CopyMultiInsertBufferFlush(miinfo, buffer, processed);
 	}
 
 	miinfo->bufferedTuples = 0;
@@ -679,6 +769,23 @@ CopyFrom(CopyFromState cstate)
 		resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
 														 resultRelInfo);
 
+	/*
+	 * Also, if the named relation is a foreign table, determine if the FDW
+	 * supports batch insert and determine the batch size (a FDW may support
+	 * batching, but it may be disabled for the server/table).
+	 *
+	 * If the FDW does not support batching, we set the batch size to 1.
+	 */
+	if (resultRelInfo->ri_FdwRoutine != NULL &&
+		resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize &&
+		resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert)
+		resultRelInfo->ri_BatchSize =
+			resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(resultRelInfo);
+	else
+		resultRelInfo->ri_BatchSize = 1;
+
+	Assert(resultRelInfo->ri_BatchSize >= 1);
+
 	/* Prepare to catch AFTER triggers. */
 	AfterTriggerBeginQuery();
 
@@ -725,6 +832,15 @@ CopyFrom(CopyFromState cstate)
 		 */
 		insertMethod = CIM_SINGLE;
 	}
+	else if (resultRelInfo->ri_FdwRoutine != NULL &&
+			 resultRelInfo->ri_BatchSize == 1)
+	{
+		/*
+		 * Can't support multi-inserts to foreign tables if the FDW does not
+		 * support batching.
+		 */
+		insertMethod = CIM_SINGLE;
+	}
 	else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL &&
 			 resultRelInfo->ri_TrigDesc->trig_insert_new_table)
 	{
@@ -737,14 +853,12 @@ CopyFrom(CopyFromState cstate)
 		 */
 		insertMethod = CIM_SINGLE;
 	}
-	else if (resultRelInfo->ri_FdwRoutine != NULL ||
-			 cstate->volatile_defexprs)
+	else if (cstate->volatile_defexprs)
 	{
 		/*
-		 * Can't support multi-inserts to foreign tables or if there are any
-		 * volatile default expressions in the table.  Similarly to the
-		 * trigger case above, such expressions may query the table we're
-		 * inserting into.
+		 * Can't support multi-inserts if there are any volatile default
+		 * expressions in the table.  Similarly to the trigger case above,
+		 * such expressions may query the table we're inserting into.
 		 *
 		 * Note: It does not matter if any partitions have any volatile
 		 * default expressions as we use the defaults from the target of the
@@ -910,12 +1024,14 @@ CopyFrom(CopyFromState cstate)
 
 				/*
 				 * Disable multi-inserts when the partition has BEFORE/INSTEAD
-				 * OF triggers, or if the partition is a foreign partition.
+				 * OF triggers, or if the partition is a foreign partition
+				 * that can't use batching.
 				 */
 				leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
 					!has_before_insert_row_trig &&
 					!has_instead_insert_row_trig &&
-					resultRelInfo->ri_FdwRoutine == NULL;
+					(resultRelInfo->ri_FdwRoutine == NULL ||
+					 resultRelInfo->ri_BatchSize > 1);
 
 				/* Set the multi-insert buffer to use for this partition. */
 				if (leafpart_use_multi_insert)
@@ -931,7 +1047,9 @@ CopyFrom(CopyFromState cstate)
 					 * Flush pending inserts if this partition can't use
 					 * batching, so rows are visible to triggers etc.
 					 */
-					CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
+					CopyMultiInsertInfoFlush(&multiInsertInfo,
+											 resultRelInfo,
+											 &processed);
 				}
 
 				if (bistate != NULL)
@@ -1067,7 +1185,17 @@ CopyFrom(CopyFromState cstate)
 					 * buffers out to their tables.
 					 */
 					if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
-						CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
+						CopyMultiInsertInfoFlush(&multiInsertInfo,
+												 resultRelInfo,
+												 &processed);
+
+					/*
+					 * We delay updating the row counter and the progress of
+					 * the COPY command until after writing the tuples stored
+					 * in the buffer out to the table.  See
+					 * CopyMultiInsertBufferFlush().
+					 */
+					continue;	/* next tuple please */
 				}
 				else
 				{
@@ -1130,7 +1258,7 @@ CopyFrom(CopyFromState cstate)
 	if (insertMethod != CIM_SINGLE)
 	{
 		if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
-			CopyMultiInsertInfoFlush(&multiInsertInfo, NULL);
+			CopyMultiInsertInfoFlush(&multiInsertInfo, NULL, &processed);
 	}
 
 	/* Done, clean up */
@@ -1349,6 +1477,7 @@ BeginCopyFrom(ParseState *pstate,
 	cstate->cur_lineno = 0;
 	cstate->cur_attname = NULL;
 	cstate->cur_attval = NULL;
+	cstate->relname_only = false;
 
 	/*
 	 * Allocate buffers for the input pipeline.
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index e37c6032ae..21e8b89baa 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -81,6 +81,7 @@ typedef struct CopyFromStateData
 	uint64		cur_lineno;		/* line number for error messages */
 	const char *cur_attname;	/* current att for error messages */
 	const char *cur_attval;		/* current att value for error messages */
+	bool		relname_only;	/* don't output line number, att, etc. */
 
 	/*
 	 * Working state
