On Fri, 2023-12-29 at 15:22 -0800, Jeff Davis wrote:
> On Tue, 2023-09-05 at 12:08 -0700, Jeff Davis wrote:
> > OK, so we could have a built-in FDW called pg_connection that would
> > do
> > the right kinds of validation; and then also allow other FDWs but
> > the
> > subscription would have to do its own validation.
> 
> Attached a rough rebased version.

Attached a slightly better version which fixes a pg_dump issue and
improves the documentation.

Regards,
        Jeff Davis

From 0b8cb23157b86909d38cc10723f19d94787efed2 Mon Sep 17 00:00:00 2001
From: Jeff Davis <j...@j-davis.com>
Date: Wed, 23 Aug 2023 10:31:16 -0700
Subject: [PATCH v4] CREATE SUBSCRIPTION ... SERVER.

---
 doc/src/sgml/ref/alter_subscription.sgml      |  18 +-
 doc/src/sgml/ref/create_subscription.sgml     |  16 +-
 doc/src/sgml/user-manag.sgml                  |  12 +-
 src/backend/catalog/Makefile                  |   1 +
 src/backend/catalog/pg_subscription.c         |  17 +-
 src/backend/catalog/system_functions.sql      |   2 +
 src/backend/commands/subscriptioncmds.c       | 197 ++++++++++++++--
 src/backend/foreign/foreign.c                 | 214 ++++++++++++++++++
 src/backend/parser/gram.y                     |  20 ++
 src/backend/replication/logical/worker.c      |  12 +-
 src/bin/pg_dump/pg_dump.c                     |  63 +++++-
 src/bin/pg_dump/pg_dump.h                     |   1 +
 src/bin/psql/describe.c                       |   2 +-
 src/bin/psql/tab-complete.c                   |   2 +-
 src/include/catalog/meson.build               |   1 +
 src/include/catalog/pg_authid.dat             |   5 +
 .../catalog/pg_foreign_data_wrapper.dat       |  22 ++
 src/include/catalog/pg_proc.dat               |   8 +
 src/include/catalog/pg_subscription.h         |   5 +-
 src/include/foreign/foreign.h                 |   1 +
 src/include/nodes/parsenodes.h                |   3 +
 src/test/regress/expected/foreign_data.out    |  60 ++++-
 src/test/regress/expected/subscription.out    |  38 ++++
 src/test/regress/sql/foreign_data.sql         |  41 +++-
 src/test/regress/sql/subscription.sql         |  39 ++++
 src/test/subscription/t/001_rep_changes.pl    |  57 +++++
 26 files changed, 804 insertions(+), 53 deletions(-)
 create mode 100644 src/include/catalog/pg_foreign_data_wrapper.dat

diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 6d36ff0dc9..6d219145a9 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -21,6 +21,7 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
+ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SERVER <replaceable>servername</replaceable>
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> CONNECTION '<replaceable>conninfo</replaceable>'
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SET PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> ADD PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
@@ -94,13 +95,24 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
     </listitem>
    </varlistentry>
 
+   <varlistentry id="sql-altersubscription-params-server">
+    <term><literal>SERVER <replaceable class="parameter">servername</replaceable></literal></term>
+    <listitem>
+     <para>
+      This clause replaces the foreign server or connection string originally
+      set by <xref linkend="sql-createsubscription"/> with the foreign server
+      <replaceable>servername</replaceable>.
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry id="sql-altersubscription-params-connection">
     <term><literal>CONNECTION '<replaceable class="parameter">conninfo</replaceable>'</literal></term>
     <listitem>
      <para>
-      This clause replaces the connection string originally set by
-      <xref linkend="sql-createsubscription"/>.  See there for more
-      information.
+      This clause replaces the foreign server or connection string originally
+      set by <xref linkend="sql-createsubscription"/> with the connection
+      string <replaceable>conninfo</replaceable>.
      </para>
     </listitem>
    </varlistentry>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index f1c20b3a46..8cf67516cf 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -22,7 +22,7 @@ PostgreSQL documentation
  <refsynopsisdiv>
 <synopsis>
 CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceable>
-    CONNECTION '<replaceable class="parameter">conninfo</replaceable>'
+    { SERVER <replaceable class="parameter">servername</replaceable> | CONNECTION '<replaceable class="parameter">conninfo</replaceable>' }
     PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...]
     [ WITH ( <replaceable class="parameter">subscription_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
 </synopsis>
@@ -77,6 +77,15 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
     </listitem>
    </varlistentry>
 
+   <varlistentry id="sql-createsubscription-params-server">
+    <term><literal>SERVER <replaceable class="parameter">servername</replaceable></literal></term>
+    <listitem>
+     <para>
+      A foreign server to use for the connection.
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry id="sql-createsubscription-params-connection">
     <term><literal>CONNECTION '<replaceable class="parameter">conninfo</replaceable>'</literal></term>
     <listitem>
@@ -363,6 +372,11 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
           The default is <literal>true</literal>. Only superusers can set
           this value to <literal>false</literal>.
          </para>
+         <para>
+          Only allowed when using a connection string. If using a foreign
+          server, specify <literal>password_required</literal> as part of the
+          user mapping for the foreign server, instead.
+         </para>
         </listitem>
        </varlistentry>
 
diff --git a/doc/src/sgml/user-manag.sgml b/doc/src/sgml/user-manag.sgml
index 92a299d2d3..4f4c20ba3c 100644
--- a/doc/src/sgml/user-manag.sgml
+++ b/doc/src/sgml/user-manag.sgml
@@ -687,11 +687,19 @@ DROP ROLE doomed_role;
        <entry>Allow use of connection slots reserved via
        <xref linkend="guc-reserved-connections"/>.</entry>
       </row>
+      <row>
+       <entry>pg_create_connection</entry>
+       <entry>Allow users to specify a connection string directly in <link
+       linkend="sql-createsubscription"><command>CREATE
+       SUBSCRIPTION</command></link>.</entry>
+      </row>
       <row>
        <entry>pg_create_subscription</entry>
        <entry>Allow users with <literal>CREATE</literal> permission on the
-       database to issue
-       <link linkend="sql-createsubscription"><command>CREATE SUBSCRIPTION</command></link>.</entry>
+       database to issue <link
+       linkend="sql-createsubscription"><command>CREATE
+       SUBSCRIPTION</command></link>.  This role is a member of
+       <literal>pg_create_connection</literal>.</entry>
       </row>
      </tbody>
     </tgroup>
diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile
index ec7b6f5362..365c956dea 100644
--- a/src/backend/catalog/Makefile
+++ b/src/backend/catalog/Makefile
@@ -136,6 +136,7 @@ POSTGRES_BKI_DATA = $(addprefix $(top_srcdir)/src/include/catalog/,\
 	pg_collation.dat \
 	pg_conversion.dat \
 	pg_database.dat \
+	pg_foreign_data_wrapper.dat \
 	pg_language.dat \
 	pg_namespace.dat \
 	pg_opclass.dat \
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index d6a978f136..f5c4ec8d99 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -23,6 +23,7 @@
 #include "catalog/pg_subscription.h"
 #include "catalog/pg_subscription_rel.h"
 #include "catalog/pg_type.h"
+#include "foreign/foreign.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "storage/lmgr.h"
@@ -75,10 +76,18 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->runasowner = subform->subrunasowner;
 
 	/* Get conninfo */
-	datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
-								   tup,
-								   Anum_pg_subscription_subconninfo);
-	sub->conninfo = TextDatumGetCString(datum);
+	if (OidIsValid(subform->subserver))
+	{
+		sub->conninfo = ForeignServerConnectionString(subform->subowner,
+													  subform->subserver);
+	}
+	else
+	{
+		datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
+									   tup,
+									   Anum_pg_subscription_subconninfo);
+		sub->conninfo = TextDatumGetCString(datum);
+	}
 
 	/* Get slotname */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index 4206752881..e1abda41ba 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -781,3 +781,5 @@ GRANT pg_read_all_settings TO pg_monitor;
 GRANT pg_read_all_stats TO pg_monitor;
 
 GRANT pg_stat_scan_tables TO pg_monitor;
+
+GRANT pg_create_connection TO pg_create_subscription;
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index edc82c11be..c46900182e 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -25,14 +25,17 @@
 #include "catalog/objectaddress.h"
 #include "catalog/pg_authid_d.h"
 #include "catalog/pg_database_d.h"
+#include "catalog/pg_foreign_server.h"
 #include "catalog/pg_subscription.h"
 #include "catalog/pg_subscription_rel.h"
 #include "catalog/pg_type.h"
+#include "catalog/pg_user_mapping.h"
 #include "commands/dbcommands.h"
 #include "commands/defrem.h"
 #include "commands/event_trigger.h"
 #include "commands/subscriptioncmds.h"
 #include "executor/executor.h"
+#include "foreign/foreign.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "pgstat.h"
@@ -574,6 +577,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	Datum		values[Natts_pg_subscription];
 	Oid			owner = GetUserId();
 	HeapTuple	tup;
+	Oid			serverid;
+	Oid			umid;
 	char	   *conninfo;
 	char		originname[NAMEDATALEN];
 	List	   *publications;
@@ -594,6 +599,12 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					  SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
+	if (IsSet(opts.specified_opts, SUBOPT_PASSWORD_REQUIRED) && stmt->servername)
+		ereport(ERROR,
+				(errcode(ERRCODE_SYNTAX_ERROR),
+				 errmsg("option \"password_required\" invalid on subscriptions to a foreign server"),
+				 errhint("Use the \"password_required\" option on the user mappings associated with the foreign server.")));
+
 	/*
 	 * Since creating a replication slot is not transactional, rolling back
 	 * the transaction leaves the created replication slot.  So we cannot run
@@ -604,9 +615,9 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 		PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
 
 	/*
-	 * We don't want to allow unprivileged users to be able to trigger
-	 * attempts to access arbitrary network destinations, so require the user
-	 * to have been specifically authorized to create subscriptions.
+	 * We don't want to allow unprivileged users to utilize the resources that
+	 * a subscription requires (such as a background worker), so require the
+	 * user to have been specifically authorized to create subscriptions.
 	 */
 	if (!has_privs_of_role(owner, ROLE_PG_CREATE_SUBSCRIPTION))
 		ereport(ERROR,
@@ -666,14 +677,54 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	if (opts.synchronous_commit == NULL)
 		opts.synchronous_commit = "off";
 
-	conninfo = stmt->conninfo;
-	publications = stmt->publication;
-
 	/* Load the library providing us libpq calls. */
 	load_file("libpqwalreceiver", false);
 
-	/* Check the connection info string. */
-	walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
+	if (stmt->servername)
+	{
+		ForeignServer	*server;
+		UserMapping		*um;
+
+		Assert(!stmt->conninfo);
+		conninfo = NULL;
+
+		server = GetForeignServerByName(stmt->servername, false);
+		aclresult = object_aclcheck(ForeignServerRelationId, server->serverid, owner, ACL_USAGE);
+		if (aclresult != ACLCHECK_OK)
+			aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, server->servername);
+
+		um = GetUserMapping(owner, server->serverid);
+
+		serverid = server->serverid;
+		umid = um->umid;
+		conninfo = ForeignServerConnectionString(owner, serverid);
+	}
+	else
+	{
+		Assert(stmt->conninfo);
+
+		/*
+		 * We don't want to allow unprivileged users to be able to trigger
+		 * attempts to access arbitrary network destinations, so require the user
+		 * to have been specifically authorized to create connections.
+		 */
+		if (!has_privs_of_role(owner, ROLE_PG_CREATE_CONNECTION))
+			ereport(ERROR,
+					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+					 errmsg("permission denied to create subscription with a connection string"),
+					 errdetail("Only roles with privileges of the \"%s\" role may create subscriptions with CONNECTION specified.",
+							   "pg_create_connection"),
+					 errhint("Create a subscription to a foreign server by specifying SERVER instead.")));
+
+		/* Check the connection info string. */
+		walrcv_check_conninfo(stmt->conninfo, opts.passwordrequired && !superuser());
+
+		serverid = InvalidOid;
+		umid = InvalidOid;
+		conninfo = stmt->conninfo;
+	}
+
+	publications = stmt->publication;
 
 	/* Everything ok, form a new tuple. */
 	memset(values, 0, sizeof(values));
@@ -697,8 +748,12 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
 	values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
 	values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
-	values[Anum_pg_subscription_subconninfo - 1] =
-		CStringGetTextDatum(conninfo);
+	values[Anum_pg_subscription_subserver - 1] = serverid;
+	if (!OidIsValid(serverid))
+		values[Anum_pg_subscription_subconninfo - 1] =
+			CStringGetTextDatum(conninfo);
+	else
+		nulls[Anum_pg_subscription_subconninfo - 1] = true;
 	if (opts.slot_name)
 		values[Anum_pg_subscription_subslotname - 1] =
 			DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
@@ -719,6 +774,20 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 
 	recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
 
+	ObjectAddressSet(myself, SubscriptionRelationId, subid);
+
+	if (stmt->servername)
+	{
+		ObjectAddress referenced;
+		Assert(OidIsValid(serverid) && OidIsValid(umid));
+
+		ObjectAddressSet(referenced, ForeignServerRelationId, serverid);
+		recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+
+		ObjectAddressSet(referenced, UserMappingRelationId, umid);
+		recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+	}
+
 	ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
 	replorigin_create(originname);
 
@@ -835,8 +904,6 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	if (opts.enabled)
 		ApplyLauncherWakeupAtCommit();
 
-	ObjectAddressSet(myself, SubscriptionRelationId, subid);
-
 	InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
 
 	return myself;
@@ -1124,6 +1191,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 	memset(nulls, false, sizeof(nulls));
 	memset(replaces, false, sizeof(replaces));
 
+	ObjectAddressSet(myself, SubscriptionRelationId, subid);
+
 	switch (stmt->kind)
 	{
 		case ALTER_SUBSCRIPTION_OPTIONS:
@@ -1191,6 +1260,12 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 				if (IsSet(opts.specified_opts, SUBOPT_PASSWORD_REQUIRED))
 				{
+					if (OidIsValid(form->subserver))
+						ereport(ERROR,
+								(errcode(ERRCODE_SYNTAX_ERROR),
+								 errmsg("option \"password_required\" invalid on subscriptions to a foreign server"),
+								 errhint("Use the \"password_required\" option on the user mappings associated with the foreign server.")));
+
 					/* Non-superuser may not disable password_required. */
 					if (!opts.passwordrequired && !superuser())
 						ereport(ERROR,
@@ -1244,7 +1319,77 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				break;
 			}
 
+		case ALTER_SUBSCRIPTION_SERVER:
+			{
+				ForeignServer	*new_server;
+				UserMapping		*new_um;
+				ObjectAddress	 referenced;
+				AclResult		 aclresult;
+
+				/*
+				 * Remove what was there before, either another foreign server
+				 * or a connection string.
+				 */
+				if (form->subserver)
+				{
+					UserMapping *old_um = GetUserMapping(form->subowner, form->subserver);
+
+					deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
+													   DEPENDENCY_NORMAL,
+													   ForeignServerRelationId, form->subserver);
+					deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
+													   DEPENDENCY_NORMAL,
+													   UserMappingRelationId, old_um->umid);
+				}
+				else
+				{
+					nulls[Anum_pg_subscription_subconninfo - 1] = true;
+					replaces[Anum_pg_subscription_subconninfo - 1] = true;
+				}
+
+				/*
+				 * Find the new server and user mapping. Check ACL of server
+				 * based on current user ID, but find the user mapping based
+				 * on the subscription owner.
+				 */
+				new_server = GetForeignServerByName(stmt->servername, false);
+				aclresult = object_aclcheck(ForeignServerRelationId,
+											new_server->serverid, GetUserId(), ACL_USAGE);
+				if (aclresult != ACLCHECK_OK)
+					aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER,
+								   new_server->servername);
+
+				new_um = GetUserMapping(form->subowner, new_server->serverid);
+
+				values[Anum_pg_subscription_subserver - 1] = new_server->serverid;
+				replaces[Anum_pg_subscription_subserver - 1] = true;
+
+				ObjectAddressSet(referenced, ForeignServerRelationId, new_server->serverid);
+				recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+
+				ObjectAddressSet(referenced, UserMappingRelationId, new_um->umid);
+				recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+				update_tuple = true;
+			}
+			break;
+
 		case ALTER_SUBSCRIPTION_CONNECTION:
+			/* remove reference to foreign server and dependencies, if present */
+			if (form->subserver)
+			{
+				UserMapping *old_um = GetUserMapping(form->subowner, form->subserver);
+
+				deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
+												   DEPENDENCY_NORMAL,
+												   ForeignServerRelationId, form->subserver);
+				deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
+												   DEPENDENCY_NORMAL,
+												   UserMappingRelationId, old_um->umid);
+
+				values[Anum_pg_subscription_subserver - 1] = InvalidOid;
+				replaces[Anum_pg_subscription_subserver - 1] = true;
+			}
+
 			/* Load the library providing us libpq calls. */
 			load_file("libpqwalreceiver", false);
 			/* Check the connection info string. */
@@ -1455,8 +1600,6 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 	table_close(rel, RowExclusiveLock);
 
-	ObjectAddressSet(myself, SubscriptionRelationId, subid);
-
 	InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
 
 	/* Wake up related replication workers to handle this change quickly. */
@@ -1541,9 +1684,17 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	subname = pstrdup(NameStr(*DatumGetName(datum)));
 
 	/* Get conninfo */
-	datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
-								   Anum_pg_subscription_subconninfo);
-	conninfo = TextDatumGetCString(datum);
+	if (OidIsValid(form->subserver))
+	{
+		conninfo = ForeignServerConnectionString(form->subowner,
+												 form->subserver);
+	}
+	else
+	{
+		datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
+									   Anum_pg_subscription_subconninfo);
+		conninfo = TextDatumGetCString(datum);
+	}
 
 	/* Get slotname */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
@@ -1644,6 +1795,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	}
 
 	/* Clean up dependencies */
+	deleteDependencyRecordsFor(SubscriptionRelationId, subid, false);
 	deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
 
 	/* Remove any associated relation synchronization states. */
@@ -1853,6 +2005,17 @@ AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
 		aclcheck_error(aclresult, OBJECT_DATABASE,
 					   get_database_name(MyDatabaseId));
 
+	if (form->subserver)
+	{
+		UserMapping *old_um = GetUserMapping(form->subowner, form->subserver);
+		UserMapping *new_um = GetUserMapping(newOwnerId, form->subserver);
+
+		if (changeDependencyFor(SubscriptionRelationId, form->oid,
+								UserMappingRelationId, old_um->umid, new_um->umid) != 1)
+			elog(ERROR, "could not change user mapping dependency for subscription %u",
+				 form->oid);
+	}
+
 	form->subowner = newOwnerId;
 	CatalogTupleUpdate(rel, &tup->t_self, tup);
 
diff --git a/src/backend/foreign/foreign.c b/src/backend/foreign/foreign.c
index fc3edef2a8..5a800fc48f 100644
--- a/src/backend/foreign/foreign.c
+++ b/src/backend/foreign/foreign.c
@@ -18,11 +18,14 @@
 #include "catalog/pg_foreign_server.h"
 #include "catalog/pg_foreign_table.h"
 #include "catalog/pg_user_mapping.h"
+#include "commands/defrem.h"
 #include "foreign/fdwapi.h"
 #include "foreign/foreign.h"
 #include "funcapi.h"
 #include "lib/stringinfo.h"
+#include "mb/pg_wchar.h"
 #include "miscadmin.h"
+#include "replication/walreceiver.h"
 #include "utils/builtins.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
@@ -190,6 +193,94 @@ GetForeignServerByName(const char *srvname, bool missing_ok)
 }
 
 
+/*
+ * Escape a connection option value. Helper for options_to_connstr().
+ */
+static char *
+escape_value(char *val)
+{
+	StringInfoData result;
+
+	initStringInfo(&result);
+
+	for (int i = 0; val[i] != '\0'; i++)
+	{
+		if (val[i] == '\\' || val[i] == '\'')
+			appendStringInfoChar(&result, '\\');
+		appendStringInfoChar(&result, val[i]);
+	}
+
+	return result.data;
+}
+
+
+/*
+ * Helper for ForeignServerConnectionString() and pg_connection_validator().
+ *
+ * Transform a List of DefElem into a connection string.
+ *
+ * XXX: might leak memory, investigate
+ */
+static char *
+options_to_connstr(List *options)
+{
+	StringInfoData	 connstr;
+	ListCell		*lc;
+	bool			 first = true;
+
+	initStringInfo(&connstr);
+	foreach(lc, options)
+	{
+		DefElem *d = (DefElem *) lfirst(lc);
+		char *name = d->defname;
+		char *value;
+
+		/* not a libpq option; skip */
+		if (strcmp(name, "password_required") == 0)
+			continue;
+
+		/* XXX: pfree() result of defGetString() if needed? */
+		value = escape_value(defGetString(d));
+
+		appendStringInfo(&connstr, "%s%s = '%s'",
+						 first ? "" : " ", name, value);
+		first = false;
+
+		pfree(value);
+	}
+
+	/* override client_encoding */
+	appendStringInfo(&connstr, "%sclient_encoding = '%s'",
+					 first ? "" : " ", GetDatabaseEncodingName());
+
+	return connstr.data;
+}
+
+
+/*
+ * Given a user ID and server ID, return a postgres connection string suitable
+ * to pass to libpq.
+ *
+ * XXX: might leak memory, investigate
+ */
+char *
+ForeignServerConnectionString(Oid userid, Oid serverid)
+{
+	ForeignServer	*server	 = GetForeignServer(serverid);
+	UserMapping		*um		 = GetUserMapping(userid, serverid);
+	List			*options = list_concat(um->options, server->options);
+	char			*connstr;
+
+	connstr = options_to_connstr(options);
+
+	pfree(server);
+	pfree(um);
+	list_free(options);
+
+	return connstr;
+}
+
+
 /*
  * GetUserMapping - look up the user mapping.
  *
@@ -599,6 +690,129 @@ is_conninfo_option(const char *option, Oid context)
 	return false;
 }
 
+/*
+ * pg_connection_handler
+ *
+ * pg_connection_fdw is not used for foreign tables, so the handler should
+ * never be called.
+ */
+Datum
+pg_connection_handler(PG_FUNCTION_ARGS)
+{
+	elog(ERROR, "pg_connection_handler unexpectedly called");
+}
+
+/*
+ * Option validator for CREATE SERVER ... FOR CONNECTION ONLY.
+ *
+ * XXX: try to unify with validators for CREATE SUBSCRIPTION ... CONNECTION,
+ * postgres_fdw, and dblink. Also investigate if memory leaks are a problem
+ * here.
+ */
+Datum
+pg_connection_validator(PG_FUNCTION_ARGS)
+{
+	List			*options_list = untransformRelOptions(PG_GETARG_DATUM(0));
+	Oid				 catalog	  = PG_GETARG_OID(1);
+
+	if (catalog == ForeignServerRelationId)
+	{
+		char		*conninfo;
+		ListCell	*lc;
+
+		foreach(lc, options_list)
+		{
+			DefElem *d = (DefElem *) lfirst(lc);
+
+			if (strcmp(d->defname, "client_encoding") == 0)
+				ereport(ERROR,
+						(errmsg("cannot specify client_encoding for pg_connection_fdw")));
+
+			if (strcmp(d->defname, "user") == 0 ||
+				strcmp(d->defname, "password") == 0 ||
+				strcmp(d->defname, "sslpassword") == 0 ||
+				strcmp(d->defname, "password_required") == 0)
+				ereport(ERROR,
+						(errmsg("invalid option \"%s\" for pg_connection_fdw",
+								d->defname),
+						 errhint("Specify option \"%s\" for a user mapping associated with the server instead.",
+								 d->defname)));
+		}
+
+		conninfo = options_to_connstr(options_list);
+
+		/* Load the library providing us libpq calls. */
+		load_file("libpqwalreceiver", false);
+
+		walrcv_check_conninfo(conninfo, false);
+	}
+	else if (catalog == UserMappingRelationId)
+	{
+		bool		 password_required = true;
+		bool		 password_provided = false;
+		ListCell	*lc;
+
+		foreach(lc, options_list)
+		{
+			DefElem *d = (DefElem *) lfirst(lc);
+
+			if (strcmp(d->defname, "password_required") == 0)
+			{
+				/*
+				 * Only the superuser may set this option on a user mapping, or
+				 * alter a user mapping on which this option is set. We allow a
+				 * user to clear this option if it's set - in fact, we don't have
+				 * a choice since we can't see the old mapping when validating an
+				 * alter.
+				 */
+				if (!superuser() && !defGetBoolean(d))
+					ereport(ERROR,
+							(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+							 errmsg("password_required=false is superuser-only for pg_connection_fdw"),
+							 errhint("User mappings with the password_required option set to false may only be created or modified by the superuser.")));
+
+				password_required = defGetBoolean(d);
+			}
+
+			if ((strcmp(d->defname, "sslkey") == 0 || strcmp(d->defname, "sslcert") == 0) && !superuser())
+				ereport(ERROR,
+						(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+						 errmsg("sslcert and sslkey are superuser-only for pg_connection_fdw"),
+						 errhint("User mappings with the sslcert or sslkey options set may only be created or modified by the superuser.")));
+
+			if (strcmp(d->defname, "password") == 0)
+				password_provided = true;
+
+			if (strcmp(d->defname, "user") != 0 &&
+				strcmp(d->defname, "password") != 0 &&
+				strcmp(d->defname, "sslpassword") != 0 &&
+				strcmp(d->defname, "sslkey") != 0 &&
+				strcmp(d->defname, "sslcert") != 0 &&
+				strcmp(d->defname, "password_required") != 0)
+				ereport(ERROR,
+						(errmsg("invalid user mapping option \"%s\" for pg_connection_fdw",
+								d->defname)));
+		}
+
+		if (password_required && !password_provided)
+			ereport(ERROR,
+					(errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
+					 errmsg("password is required"),
+					 errdetail("Non-superusers must provide a password in the connection string.")));
+	}
+	else if (catalog == ForeignTableRelationId)
+		ereport(ERROR,
+				(errmsg("cannot create foreign table using a pg_connection_fdw server"),
+				 errhint("Use the server with CREATE SUBSCRIPTION instead.")));
+	else if (catalog == AttributeRelationId)
+		elog(ERROR, "unexpected call to pg_connection_validator for pg_attribute catalog");
+	else
+		elog(ERROR, "unexpected call to pg_connection_validator for catalog %d", catalog);
+
+
+	PG_RETURN_BOOL(true);
+}
+
 
 /*
  * Validate the generic option given to SERVER or USER MAPPING.
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 63f172e175..259b5ca42b 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -10638,6 +10638,16 @@ CreateSubscriptionStmt:
 					n->options = $8;
 					$$ = (Node *) n;
 				}
+			| CREATE SUBSCRIPTION name SERVER name PUBLICATION name_list opt_definition
+				{
+					CreateSubscriptionStmt *n =
+						makeNode(CreateSubscriptionStmt);
+					n->subname = $3;
+					n->servername = $5;
+					n->publication = $7;
+					n->options = $8;
+					$$ = (Node *) n;
+				}
 		;
 
 /*****************************************************************************
@@ -10667,6 +10677,16 @@ AlterSubscriptionStmt:
 					n->conninfo = $5;
 					$$ = (Node *) n;
 				}
+			| ALTER SUBSCRIPTION name SERVER name
+				{
+					AlterSubscriptionStmt *n =
+						makeNode(AlterSubscriptionStmt);
+
+					n->kind = ALTER_SUBSCRIPTION_SERVER;
+					n->subname = $3;
+					n->servername = $5;
+					$$ = (Node *) n;
+				}
 			| ALTER SUBSCRIPTION name REFRESH PUBLICATION opt_definition
 				{
 					AlterSubscriptionStmt *n =
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 21abf34ef7..be63ab5a5d 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4003,7 +4003,9 @@ maybe_reread_subscription(void)
 }
 
 /*
- * Callback from subscription syscache invalidation.
+ * Callback from subscription syscache invalidation. Also needed for server or
+ * user mapping invalidation, which can change the connection information for
+ * subscriptions that connect using a server object.
  */
 static void
 subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
@@ -4639,6 +4641,14 @@ InitializeLogRepWorker(void)
 	CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
 								  subscription_change_cb,
 								  (Datum) 0);
+	/* Keep us informed about subscription changes. */
+	CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
+								  subscription_change_cb,
+								  (Datum) 0);
+	/* Keep us informed about subscription changes. */
+	CacheRegisterSyscacheCallback(USERMAPPINGOID,
+								  subscription_change_cb,
+								  (Datum) 0);
 
 	CacheRegisterSyscacheCallback(AUTHOID,
 								  subscription_change_cb,
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 050a831226..b0664b6bbb 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -2066,6 +2066,27 @@ selectDumpableStatisticsObject(StatsExtInfo *sobj, Archive *fout)
 		sobj->dobj.dump = DUMP_COMPONENT_NONE;
 }
 
+/*
+ * selectDumpableFdw: policy-setting subroutine
+ *		Mark foreign data wrapper as to be dumped or not
+ *
+ * Froeign Data Wrappers do not belong to any particular namespace.  To
+ * identify built-in foreign data wrappers, we must resort to checking whether
+ * the method's OID is in the range reserved for initdb.
+ */
+static void
+selectDumpableFdw(FdwInfo *fdwinfo, Archive *fout)
+{
+	if (checkExtensionMembership(&fdwinfo->dobj, fout))
+		return;					/* extension membership overrides all else */
+
+	if (fdwinfo->dobj.catId.oid <= (Oid) g_last_builtin_oid)
+		fdwinfo->dobj.dump = DUMP_COMPONENT_NONE;
+	else
+		fdwinfo->dobj.dump = fout->dopt->include_everything ?
+			DUMP_COMPONENT_ALL : DUMP_COMPONENT_NONE;
+}
+
 /*
  * selectDumpableObject: policy-setting subroutine
  *		Mark a generic dumpable object as to be dumped or not
@@ -4633,6 +4654,7 @@ getSubscriptions(Archive *fout)
 	int			i_subdisableonerr;
 	int			i_subpasswordrequired;
 	int			i_subrunasowner;
+	int			i_subservername;
 	int			i_subconninfo;
 	int			i_subslotname;
 	int			i_subsynccommit;
@@ -4693,18 +4715,27 @@ getSubscriptions(Archive *fout)
 		appendPQExpBufferStr(query,
 							 " s.subpasswordrequired,\n"
 							 " s.subrunasowner,\n"
-							 " s.suborigin\n");
+							 " s.suborigin,\n");
 	else
 		appendPQExpBuffer(query,
 						  " 't' AS subpasswordrequired,\n"
 						  " 't' AS subrunasowner,\n"
-						  " '%s' AS suborigin\n",
+						  " '%s' AS suborigin,\n",
 						  LOGICALREP_ORIGIN_ANY);
 
-	appendPQExpBufferStr(query,
-						 "FROM pg_subscription s\n"
-						 "WHERE s.subdbid = (SELECT oid FROM pg_database\n"
-						 "                   WHERE datname = current_database())");
+	if (fout->remoteVersion >= 170000)
+		appendPQExpBufferStr(query,
+							 " fs.srvname AS subservername\n"
+							 "FROM pg_subscription s LEFT JOIN \n"
+							 "pg_foreign_server fs ON s.subserver = fs.oid\n"
+							 "WHERE s.subdbid = (SELECT oid FROM pg_database\n"
+							 "                   WHERE datname = current_database())");
+	else
+		appendPQExpBufferStr(query,
+							 " NULL AS subservername\n"
+							 "FROM pg_subscription s\n"
+							 "WHERE s.subdbid = (SELECT oid FROM pg_database\n"
+							 "                   WHERE datname = current_database())");
 
 	res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
 
@@ -4724,6 +4755,7 @@ getSubscriptions(Archive *fout)
 	i_subdisableonerr = PQfnumber(res, "subdisableonerr");
 	i_subpasswordrequired = PQfnumber(res, "subpasswordrequired");
 	i_subrunasowner = PQfnumber(res, "subrunasowner");
+	i_subservername = PQfnumber(res, "subservername");
 	i_subconninfo = PQfnumber(res, "subconninfo");
 	i_subslotname = PQfnumber(res, "subslotname");
 	i_subsynccommit = PQfnumber(res, "subsynccommit");
@@ -4741,7 +4773,10 @@ getSubscriptions(Archive *fout)
 		AssignDumpId(&subinfo[i].dobj);
 		subinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_subname));
 		subinfo[i].rolname = getRoleName(PQgetvalue(res, i, i_subowner));
-
+		if (PQgetisnull(res, i, i_subservername))
+			subinfo[i].subservername = NULL;
+		else
+			subinfo[i].subservername = pg_strdup(PQgetvalue(res, i, i_subservername));
 		subinfo[i].subbinary =
 			pg_strdup(PQgetvalue(res, i, i_subbinary));
 		subinfo[i].substream =
@@ -4804,9 +4839,17 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 	appendPQExpBuffer(delq, "DROP SUBSCRIPTION %s;\n",
 					  qsubname);
 
-	appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s CONNECTION ",
+	appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s ",
 					  qsubname);
-	appendStringLiteralAH(query, subinfo->subconninfo, fout);
+	if (subinfo->subservername)
+	{
+		appendPQExpBuffer(query, "SERVER %s", fmtId(subinfo->subservername));
+	}
+	else
+	{
+		appendPQExpBuffer(query, "CONNECTION ");
+		appendStringLiteralAH(query, subinfo->subconninfo, fout);
+	}
 
 	/* Build list of quoted publications and append them to query. */
 	if (!parsePGArray(subinfo->subpublications, &pubnames, &npubnames))
@@ -9513,7 +9556,7 @@ getForeignDataWrappers(Archive *fout, int *numForeignDataWrappers)
 		fdwinfo[i].fdwoptions = pg_strdup(PQgetvalue(res, i, i_fdwoptions));
 
 		/* Decide whether we want to dump it */
-		selectDumpableObject(&(fdwinfo[i].dobj), fout);
+		selectDumpableFdw(&fdwinfo[i], fout);
 
 		/* Mark whether FDW has an ACL */
 		if (!PQgetisnull(res, i, i_fdwacl))
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 673ca5c92d..0a94b9b7ae 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -667,6 +667,7 @@ typedef struct _SubscriptionInfo
 	char	   *subdisableonerr;
 	char	   *subpasswordrequired;
 	char	   *subrunasowner;
+	char	   *subservername;
 	char	   *subconninfo;
 	char	   *subslotname;
 	char	   *subsynccommit;
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 5077e7b358..fb4ea96968 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -5793,7 +5793,7 @@ listForeignDataWrappers(const char *pattern, bool verbose)
 						  gettext_noop("Description"));
 	}
 
-	appendPQExpBufferStr(&buf, "\nFROM pg_catalog.pg_foreign_data_wrapper fdw\n");
+	appendPQExpBufferStr(&buf, "\nFROM (SELECT tableoid, * FROM pg_catalog.pg_foreign_data_wrapper WHERE oid>=16384) fdw\n");
 
 	if (verbose)
 		appendPQExpBufferStr(&buf,
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 049801186c..230683a850 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -3314,7 +3314,7 @@ psql_completion(const char *text, int start, int end)
 
 /* CREATE SUBSCRIPTION */
 	else if (Matches("CREATE", "SUBSCRIPTION", MatchAny))
-		COMPLETE_WITH("CONNECTION");
+		COMPLETE_WITH("SERVER", "CONNECTION");
 	else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "CONNECTION", MatchAny))
 		COMPLETE_WITH("PUBLICATION");
 	else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "CONNECTION",
diff --git a/src/include/catalog/meson.build b/src/include/catalog/meson.build
index dcb3c5f766..01e92089bf 100644
--- a/src/include/catalog/meson.build
+++ b/src/include/catalog/meson.build
@@ -83,6 +83,7 @@ bki_data = [
   'pg_collation.dat',
   'pg_conversion.dat',
   'pg_database.dat',
+  'pg_foreign_data_wrapper.dat',
   'pg_language.dat',
   'pg_namespace.dat',
   'pg_opclass.dat',
diff --git a/src/include/catalog/pg_authid.dat b/src/include/catalog/pg_authid.dat
index 6b4a0aaaad..8fce457ab1 100644
--- a/src/include/catalog/pg_authid.dat
+++ b/src/include/catalog/pg_authid.dat
@@ -94,5 +94,10 @@
   rolcreaterole => 'f', rolcreatedb => 'f', rolcanlogin => 'f',
   rolreplication => 'f', rolbypassrls => 'f', rolconnlimit => '-1',
   rolpassword => '_null_', rolvaliduntil => '_null_' },
+{ oid => '6123', oid_symbol => 'ROLE_PG_CREATE_CONNECTION',
+  rolname => 'pg_create_connection', rolsuper => 'f', rolinherit => 't',
+  rolcreaterole => 'f', rolcreatedb => 'f', rolcanlogin => 'f',
+  rolreplication => 'f', rolbypassrls => 'f', rolconnlimit => '-1',
+  rolpassword => '_null_', rolvaliduntil => '_null_' },
 
 ]
diff --git a/src/include/catalog/pg_foreign_data_wrapper.dat b/src/include/catalog/pg_foreign_data_wrapper.dat
new file mode 100644
index 0000000000..7d489bf849
--- /dev/null
+++ b/src/include/catalog/pg_foreign_data_wrapper.dat
@@ -0,0 +1,22 @@
+#----------------------------------------------------------------------
+#
+# pg_foreign_data_wrapper.dat
+#    Initial contents of the pg_foreign_data_wrapper system catalog.
+#
+# Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
+# Portions Copyright (c) 1994, Regents of the University of California
+#
+# src/include/catalog/pg_foreign_data_wrapper.dat
+#
+#----------------------------------------------------------------------
+
+[
+
+{ oid => '6015', oid_symbol => 'PG_CONNECTION_FDW',
+  descr => 'Pseudo FDW for connections to Postgres',
+  fdwname => 'pg_connection_fdw', fdwowner => 'POSTGRES',
+  fdwhandler => 'pg_connection_handler',
+  fdwvalidator => 'pg_connection_validator',
+  fdwacl => '_null_', fdwoptions => '_null_'},
+
+]
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 9052f5262a..8e81e5f6bb 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -7516,6 +7516,14 @@
   proname => 'postgresql_fdw_validator', prorettype => 'bool',
   proargtypes => '_text oid', prosrc => 'postgresql_fdw_validator' },
 
+{ oid => '6122', descr => '(internal)',
+  proname => 'pg_connection_handler', prorettype => 'fdw_handler',
+  proargtypes => '', prosrc => 'pg_connection_handler' },
+
+{ oid => '6124', descr => '(internal)',
+  proname => 'pg_connection_validator', prorettype => 'bool',
+  proargtypes => '_text oid', prosrc => 'pg_connection_validator' },
+
 { oid => '2290', descr => 'I/O',
   proname => 'record_in', provolatile => 's', prorettype => 'record',
   proargtypes => 'cstring oid int4', prosrc => 'record_in' },
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index e0b91eacd2..4b83c8ca8b 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -93,9 +93,12 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 	bool		subrunasowner;	/* True if replication should execute as the
 								 * subscription owner */
 
+	Oid			subserver;		/* Set if connecting with server */
+
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* Connection string to the publisher */
-	text		subconninfo BKI_FORCE_NOT_NULL;
+	text		subconninfo BKI_FORCE_NULL;	/* Set if connecting with
+											   connection string */
 
 	/* Slot name on publisher */
 	NameData	subslotname BKI_FORCE_NULL;
diff --git a/src/include/foreign/foreign.h b/src/include/foreign/foreign.h
index 5256d4d91f..7058335d63 100644
--- a/src/include/foreign/foreign.h
+++ b/src/include/foreign/foreign.h
@@ -69,6 +69,7 @@ extern ForeignServer *GetForeignServerExtended(Oid serverid,
 											   bits16 flags);
 extern ForeignServer *GetForeignServerByName(const char *srvname,
 											 bool missing_ok);
+extern char *ForeignServerConnectionString(Oid userid, Oid serverid);
 extern UserMapping *GetUserMapping(Oid userid, Oid serverid);
 extern ForeignDataWrapper *GetForeignDataWrapper(Oid fdwid);
 extern ForeignDataWrapper *GetForeignDataWrapperExtended(Oid fdwid,
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index e494309da8..4b316d4664 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -4040,6 +4040,7 @@ typedef struct CreateSubscriptionStmt
 {
 	NodeTag		type;
 	char	   *subname;		/* Name of the subscription */
+	char	   *servername;		/* Server name of publisher */
 	char	   *conninfo;		/* Connection string to publisher */
 	List	   *publication;	/* One or more publication to subscribe to */
 	List	   *options;		/* List of DefElem nodes */
@@ -4048,6 +4049,7 @@ typedef struct CreateSubscriptionStmt
 typedef enum AlterSubscriptionType
 {
 	ALTER_SUBSCRIPTION_OPTIONS,
+	ALTER_SUBSCRIPTION_SERVER,
 	ALTER_SUBSCRIPTION_CONNECTION,
 	ALTER_SUBSCRIPTION_SET_PUBLICATION,
 	ALTER_SUBSCRIPTION_ADD_PUBLICATION,
@@ -4062,6 +4064,7 @@ typedef struct AlterSubscriptionStmt
 	NodeTag		type;
 	AlterSubscriptionType kind; /* ALTER_SUBSCRIPTION_OPTIONS, etc */
 	char	   *subname;		/* Name of the subscription */
+	char	   *servername;		/* Server name of publisher */
 	char	   *conninfo;		/* Connection string to publisher */
 	List	   *publication;	/* One or more publication to subscribe to */
 	List	   *options;		/* List of DefElem nodes */
diff --git a/src/test/regress/expected/foreign_data.out b/src/test/regress/expected/foreign_data.out
index 1dfe23cc1e..b551405a3d 100644
--- a/src/test/regress/expected/foreign_data.out
+++ b/src/test/regress/expected/foreign_data.out
@@ -25,7 +25,7 @@ CREATE FOREIGN DATA WRAPPER dummy;
 COMMENT ON FOREIGN DATA WRAPPER dummy IS 'useless';
 CREATE FOREIGN DATA WRAPPER postgresql VALIDATOR postgresql_fdw_validator;
 -- At this point we should have 2 built-in wrappers and no servers.
-SELECT fdwname, fdwhandler::regproc, fdwvalidator::regproc, fdwoptions FROM pg_foreign_data_wrapper ORDER BY 1, 2, 3;
+SELECT fdwname, fdwhandler::regproc, fdwvalidator::regproc, fdwoptions FROM pg_foreign_data_wrapper WHERE oid >= 16384 ORDER BY 1, 2, 3;
   fdwname   | fdwhandler |       fdwvalidator       | fdwoptions 
 ------------+------------+--------------------------+------------
  dummy      | -          | -                        | 
@@ -394,6 +394,47 @@ CREATE SERVER t2 FOREIGN DATA WRAPPER foo;
 
 RESET ROLE;
 REVOKE regress_test_indirect FROM regress_test_role;
+-- test SERVER ... FOR CONNECTION ONLY
+SET ROLE regress_test_role;
+CREATE SERVER t3 FOREIGN DATA WRAPPER pg_connection_fdw;   -- ERROR: not a member of pg_create_connection
+ERROR:  permission denied for foreign-data wrapper pg_connection_fdw
+RESET ROLE;
+GRANT USAGE ON FOREIGN DATA WRAPPER pg_connection_fdw TO regress_test_role;
+SET ROLE regress_test_role;
+CREATE SERVER t3 FOREIGN DATA WRAPPER pg_connection_fdw OPTIONS (client_encoding 'foo'); --fails
+ERROR:  cannot specify client_encoding for pg_connection_fdw
+CREATE SERVER t3 FOREIGN DATA WRAPPER pg_connection_fdw OPTIONS (user 'foo'); --fails
+ERROR:  invalid option "user" for pg_connection_fdw
+HINT:  Specify option "user" for a user mapping associated with the server instead.
+CREATE SERVER t3 FOREIGN DATA WRAPPER pg_connection_fdw OPTIONS (password 'foo'); --fails
+ERROR:  invalid option "password" for pg_connection_fdw
+HINT:  Specify option "password" for a user mapping associated with the server instead.
+CREATE SERVER t3 FOREIGN DATA WRAPPER pg_connection_fdw OPTIONS (password_required 'true'); --fails
+ERROR:  invalid option "password_required" for pg_connection_fdw
+HINT:  Specify option "password_required" for a user mapping associated with the server instead.
+CREATE SERVER t3 FOREIGN DATA WRAPPER pg_connection_fdw;
+IMPORT FOREIGN SCHEMA foo FROM SERVER t3 INTO bar; -- fails
+ERROR:  schema "bar" does not exist
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x'); -- fails
+ERROR:  password is required
+DETAIL:  Non-superusers must provide a password in the connection string.
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', password_required 'false'); -- fails
+ERROR:  password_required=false is superuser-only for pg_connection_fdw
+HINT:  User mappings with the password_required option set to false may only be created or modified by the superuser.
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', application_name 'nonsense'); -- fails
+ERROR:  invalid user mapping option "application_name" for pg_connection_fdw
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', password 'secret');
+DROP USER MAPPING FOR PUBLIC SERVER t3;
+RESET ROLE;
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x'); -- still fails
+ERROR:  password is required
+DETAIL:  Non-superusers must provide a password in the connection string.
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', password 'secret');
+DROP USER MAPPING FOR PUBLIC SERVER t3;
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', password_required 'false');
+DROP USER MAPPING FOR PUBLIC SERVER t3;
+DROP SERVER t3;
+REVOKE USAGE ON FOREIGN DATA WRAPPER pg_connection_fdw FROM regress_test_role;
 -- ALTER SERVER
 ALTER SERVER s0;                                            -- ERROR
 ERROR:  syntax error at or near ";"
@@ -966,13 +1007,14 @@ NOTICE:  relation "doesnt_exist_ft1" does not exist, skipping
 ALTER FOREIGN TABLE IF EXISTS doesnt_exist_ft1 RENAME TO foreign_table_1;
 NOTICE:  relation "doesnt_exist_ft1" does not exist, skipping
 -- Information schema
-SELECT * FROM information_schema.foreign_data_wrappers ORDER BY 1, 2;
- foreign_data_wrapper_catalog | foreign_data_wrapper_name | authorization_identifier  | library_name | foreign_data_wrapper_language 
-------------------------------+---------------------------+---------------------------+--------------+-------------------------------
- regression                   | dummy                     | regress_foreign_data_user |              | c
- regression                   | foo                       | regress_foreign_data_user |              | c
- regression                   | postgresql                | regress_foreign_data_user |              | c
-(3 rows)
+SELECT foreign_data_wrapper_catalog,foreign_data_wrapper_name FROM information_schema.foreign_data_wrappers ORDER BY 1, 2;
+ foreign_data_wrapper_catalog | foreign_data_wrapper_name 
+------------------------------+---------------------------
+ regression                   | dummy
+ regression                   | foo
+ regression                   | pg_connection_fdw
+ regression                   | postgresql
+(4 rows)
 
 SELECT * FROM information_schema.foreign_data_wrapper_options ORDER BY 1, 2, 3;
  foreign_data_wrapper_catalog | foreign_data_wrapper_name | option_name  | option_value 
@@ -2186,7 +2228,7 @@ NOTICE:  drop cascades to server s0
 \c
 DROP ROLE regress_foreign_data_user;
 -- At this point we should have no wrappers, no servers, and no mappings.
-SELECT fdwname, fdwhandler, fdwvalidator, fdwoptions FROM pg_foreign_data_wrapper;
+SELECT fdwname, fdwhandler, fdwvalidator, fdwoptions FROM pg_foreign_data_wrapper WHERE oid >= 16384;
  fdwname | fdwhandler | fdwvalidator | fdwoptions 
 ---------+------------+--------------+------------
 (0 rows)
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index b15eddbff3..738f1b1678 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -144,6 +144,44 @@ ERROR:  could not connect to the publisher: invalid port number: "-1"
 ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
 
+-- temporarily revoke pg_create_connection from pg_create_subscription
+-- to test that CREATE SUBSCRIPTION ... CONNECTION fails
+RESET SESSION AUTHORIZATION;
+REVOKE pg_create_connection FROM pg_create_subscription;
+GRANT CREATE ON DATABASE REGRESSION TO regress_subscription_user3;
+SET SESSION AUTHORIZATION regress_subscription_user3;
+-- fail - not a member of pg_create_connection, cannot use CONNECTION
+CREATE SUBSCRIPTION regress_testsub6 CONNECTION 'dbname=regress_doesnotexist password=regress_fakepassword' PUBLICATION testpub WITH (slot_name = NONE, connect = false);
+ERROR:  permission denied to create subscription with a connection string
+DETAIL:  Only roles with privileges of the "pg_create_connection" role may create subscriptions with CONNECTION specified.
+HINT:  Create a subscription to a foreign server by specifying SERVER instead.
+-- re-grant pg_create_connection to pg_create_subscription
+RESET SESSION AUTHORIZATION;
+GRANT pg_create_connection TO pg_create_subscription;
+SET SESSION AUTHORIZATION regress_subscription_user3;
+CREATE SUBSCRIPTION regress_testsub6 CONNECTION 'dbname=regress_doesnotexist password=regress_fakepassword' PUBLICATION testpub WITH (slot_name = NONE, connect = false);
+WARNING:  subscription was created, but is not connected
+HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
+DROP SUBSCRIPTION regress_testsub6;
+-- test using a server object instead of connection string
+RESET SESSION AUTHORIZATION;
+CREATE SERVER regress_testserver FOREIGN DATA WRAPPER pg_connection_fdw;
+CREATE USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver
+  OPTIONS (password_required 'false');
+GRANT USAGE ON FOREIGN SERVER regress_testserver TO regress_subscription_user3;
+SET SESSION AUTHORIZATION regress_subscription_user3;
+CREATE SUBSCRIPTION regress_testsub6 SERVER regress_testserver PUBLICATION testpub
+  WITH (slot_name = NONE, connect = false);
+WARNING:  subscription was created, but is not connected
+HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
+RESET SESSION AUTHORIZATION;
+ALTER SUBSCRIPTION regress_testsub6 OWNER TO regress_subscription_user; -- fail, no user mapping
+ERROR:  user mapping not found for user "regress_subscription_user", server "regress_testserver"
+DROP SUBSCRIPTION regress_testsub6;
+DROP USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver;
+DROP SERVER regress_testserver;
+REVOKE CREATE ON DATABASE regression FROM regress_subscription_user3;
+SET SESSION AUTHORIZATION regress_subscription_user;
 \dRs+
                                                                                                            List of subscriptions
       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit |          Conninfo           | Skip LSN 
diff --git a/src/test/regress/sql/foreign_data.sql b/src/test/regress/sql/foreign_data.sql
index eefb860adc..15a28de9ac 100644
--- a/src/test/regress/sql/foreign_data.sql
+++ b/src/test/regress/sql/foreign_data.sql
@@ -36,7 +36,7 @@ COMMENT ON FOREIGN DATA WRAPPER dummy IS 'useless';
 CREATE FOREIGN DATA WRAPPER postgresql VALIDATOR postgresql_fdw_validator;
 
 -- At this point we should have 2 built-in wrappers and no servers.
-SELECT fdwname, fdwhandler::regproc, fdwvalidator::regproc, fdwoptions FROM pg_foreign_data_wrapper ORDER BY 1, 2, 3;
+SELECT fdwname, fdwhandler::regproc, fdwvalidator::regproc, fdwoptions FROM pg_foreign_data_wrapper WHERE oid >= 16384 ORDER BY 1, 2, 3;
 SELECT srvname, srvoptions FROM pg_foreign_server;
 SELECT * FROM pg_user_mapping;
 
@@ -180,6 +180,41 @@ CREATE SERVER t2 FOREIGN DATA WRAPPER foo;
 RESET ROLE;
 REVOKE regress_test_indirect FROM regress_test_role;
 
+-- test SERVER ... FOR CONNECTION ONLY
+
+SET ROLE regress_test_role;
+CREATE SERVER t3 FOREIGN DATA WRAPPER pg_connection_fdw;   -- ERROR: not a member of pg_create_connection
+RESET ROLE;
+GRANT USAGE ON FOREIGN DATA WRAPPER pg_connection_fdw TO regress_test_role;
+SET ROLE regress_test_role;
+
+CREATE SERVER t3 FOREIGN DATA WRAPPER pg_connection_fdw OPTIONS (client_encoding 'foo'); --fails
+CREATE SERVER t3 FOREIGN DATA WRAPPER pg_connection_fdw OPTIONS (user 'foo'); --fails
+CREATE SERVER t3 FOREIGN DATA WRAPPER pg_connection_fdw OPTIONS (password 'foo'); --fails
+CREATE SERVER t3 FOREIGN DATA WRAPPER pg_connection_fdw OPTIONS (password_required 'true'); --fails
+CREATE SERVER t3 FOREIGN DATA WRAPPER pg_connection_fdw;
+
+IMPORT FOREIGN SCHEMA foo FROM SERVER t3 INTO bar; -- fails
+
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x'); -- fails
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', password_required 'false'); -- fails
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', application_name 'nonsense'); -- fails
+
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', password 'secret');
+DROP USER MAPPING FOR PUBLIC SERVER t3;
+
+RESET ROLE;
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x'); -- still fails
+
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', password 'secret');
+DROP USER MAPPING FOR PUBLIC SERVER t3;
+
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', password_required 'false');
+
+DROP USER MAPPING FOR PUBLIC SERVER t3;
+DROP SERVER t3;
+REVOKE USAGE ON FOREIGN DATA WRAPPER pg_connection_fdw FROM regress_test_role;
+
 -- ALTER SERVER
 ALTER SERVER s0;                                            -- ERROR
 ALTER SERVER s0 OPTIONS (a '1');                            -- ERROR
@@ -453,7 +488,7 @@ ALTER FOREIGN TABLE IF EXISTS doesnt_exist_ft1 RENAME TO foreign_table_1;
 
 -- Information schema
 
-SELECT * FROM information_schema.foreign_data_wrappers ORDER BY 1, 2;
+SELECT foreign_data_wrapper_catalog,foreign_data_wrapper_name FROM information_schema.foreign_data_wrappers ORDER BY 1, 2;
 SELECT * FROM information_schema.foreign_data_wrapper_options ORDER BY 1, 2, 3;
 SELECT * FROM information_schema.foreign_servers ORDER BY 1, 2;
 SELECT * FROM information_schema.foreign_server_options ORDER BY 1, 2, 3;
@@ -861,6 +896,6 @@ DROP FOREIGN DATA WRAPPER dummy CASCADE;
 DROP ROLE regress_foreign_data_user;
 
 -- At this point we should have no wrappers, no servers, and no mappings.
-SELECT fdwname, fdwhandler, fdwvalidator, fdwoptions FROM pg_foreign_data_wrapper;
+SELECT fdwname, fdwhandler, fdwvalidator, fdwoptions FROM pg_foreign_data_wrapper WHERE oid >= 16384;
 SELECT srvname, srvoptions FROM pg_foreign_server;
 SELECT * FROM pg_user_mapping;
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 444e563ff3..bef6bc8074 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -88,6 +88,45 @@ CREATE SUBSCRIPTION regress_testsub5 CONNECTION 'port=-1' PUBLICATION testpub;
 -- fail - invalid connection string during ALTER
 ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 
+-- temporarily revoke pg_create_connection from pg_create_subscription
+-- to test that CREATE SUBSCRIPTION ... CONNECTION fails
+RESET SESSION AUTHORIZATION;
+REVOKE pg_create_connection FROM pg_create_subscription;
+GRANT CREATE ON DATABASE REGRESSION TO regress_subscription_user3;
+SET SESSION AUTHORIZATION regress_subscription_user3;
+
+-- fail - not a member of pg_create_connection, cannot use CONNECTION
+CREATE SUBSCRIPTION regress_testsub6 CONNECTION 'dbname=regress_doesnotexist password=regress_fakepassword' PUBLICATION testpub WITH (slot_name = NONE, connect = false);
+
+-- re-grant pg_create_connection to pg_create_subscription
+RESET SESSION AUTHORIZATION;
+GRANT pg_create_connection TO pg_create_subscription;
+SET SESSION AUTHORIZATION regress_subscription_user3;
+
+CREATE SUBSCRIPTION regress_testsub6 CONNECTION 'dbname=regress_doesnotexist password=regress_fakepassword' PUBLICATION testpub WITH (slot_name = NONE, connect = false);
+DROP SUBSCRIPTION regress_testsub6;
+
+-- test using a server object instead of connection string
+
+RESET SESSION AUTHORIZATION;
+CREATE SERVER regress_testserver FOREIGN DATA WRAPPER pg_connection_fdw;
+CREATE USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver
+  OPTIONS (password_required 'false');
+GRANT USAGE ON FOREIGN SERVER regress_testserver TO regress_subscription_user3;
+
+SET SESSION AUTHORIZATION regress_subscription_user3;
+CREATE SUBSCRIPTION regress_testsub6 SERVER regress_testserver PUBLICATION testpub
+  WITH (slot_name = NONE, connect = false);
+RESET SESSION AUTHORIZATION;
+
+ALTER SUBSCRIPTION regress_testsub6 OWNER TO regress_subscription_user; -- fail, no user mapping
+DROP SUBSCRIPTION regress_testsub6;
+
+DROP USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver;
+DROP SERVER regress_testserver;
+REVOKE CREATE ON DATABASE regression FROM regress_subscription_user3;
+SET SESSION AUTHORIZATION regress_subscription_user;
+
 \dRs+
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index 16c7fb94eb..d12c182f16 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -27,6 +27,8 @@ $node_publisher->safe_psql('postgres',
 	"CREATE TABLE tab_notrep AS SELECT generate_series(1,10) AS a");
 $node_publisher->safe_psql('postgres',
 	"CREATE TABLE tab_ins AS SELECT generate_series(1,1002) AS a");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_ins2 AS SELECT generate_series(1,1002) AS a");
 $node_publisher->safe_psql('postgres',
 	"CREATE TABLE tab_full AS SELECT generate_series(1,10) AS a");
 $node_publisher->safe_psql('postgres', "CREATE TABLE tab_full2 (x text)");
@@ -65,6 +67,7 @@ $node_publisher->safe_psql('postgres',
 # Setup structure on subscriber
 $node_subscriber->safe_psql('postgres', "CREATE TABLE tab_notrep (a int)");
 $node_subscriber->safe_psql('postgres', "CREATE TABLE tab_ins (a int)");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_ins2 (a int)");
 $node_subscriber->safe_psql('postgres', "CREATE TABLE tab_full (a int)");
 $node_subscriber->safe_psql('postgres', "CREATE TABLE tab_full2 (x text)");
 $node_subscriber->safe_psql('postgres',
@@ -110,6 +113,22 @@ $node_subscriber->safe_psql('postgres',
 	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub, tap_pub_ins_only"
 );
 
+my $publisher_host = $node_publisher->host;
+my $publisher_port = $node_publisher->port;
+$node_subscriber->safe_psql('postgres',
+	"CREATE SERVER tap_sub2_server FOREIGN DATA WRAPPER pg_connection_fdw OPTIONS (host '$publisher_host', port '$publisher_port', dbname 'postgres')"
+);
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE USER MAPPING FOR PUBLIC SERVER tap_sub2_server OPTIONS (password_required 'false')"
+);
+
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_simple_pub FOR TABLE tab_ins2");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub2 SERVER tap_sub2_server PUBLICATION tap_simple_pub"
+);
+
 # Wait for initial table sync to finish
 $node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
 
@@ -121,11 +140,22 @@ $result =
   $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_ins");
 is($result, qq(1002), 'check initial data was copied to subscriber');
 
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_ins2");
+is($result, qq(1002), 'check initial data was copied to subscriber');
+
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub2 CONNECTION '$publisher_connstr'");
+
 $node_publisher->safe_psql('postgres',
 	"INSERT INTO tab_ins SELECT generate_series(1,50)");
 $node_publisher->safe_psql('postgres', "DELETE FROM tab_ins WHERE a > 20");
 $node_publisher->safe_psql('postgres', "UPDATE tab_ins SET a = -a");
 
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_ins2 SELECT generate_series(1,50)");
+
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub2 SERVER tap_sub2_server");
+
 $node_publisher->safe_psql('postgres',
 	"INSERT INTO tab_rep SELECT generate_series(1,50)");
 $node_publisher->safe_psql('postgres', "DELETE FROM tab_rep WHERE a > 20");
@@ -158,6 +188,10 @@ $result = $node_subscriber->safe_psql('postgres',
 	"SELECT count(*), min(a), max(a) FROM tab_ins");
 is($result, qq(1052|1|1002), 'check replicated inserts on subscriber');
 
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM tab_ins2");
+is($result, qq(1052|1|1002), 'check replicated inserts on subscriber');
+
 $result = $node_subscriber->safe_psql('postgres',
 	"SELECT count(*), min(a), max(a) FROM tab_rep");
 is($result, qq(20|-20|-1), 'check replicated changes on subscriber');
@@ -449,10 +483,27 @@ $node_publisher->poll_query_until('postgres',
   or die
   "Timed out while waiting for apply to restart after changing PUBLICATION";
 
+# test that changes to a foreign server subscription cause the worker
+# to restart
+$oldpid = $node_publisher->safe_psql('postgres',
+	"SELECT pid FROM pg_stat_replication WHERE application_name = 'tap_sub2' AND state = 'streaming';"
+);
+$node_subscriber->safe_psql('postgres',
+	"ALTER SERVER tap_sub2_server OPTIONS (sslmode 'disable')"
+);
+$node_publisher->poll_query_until('postgres',
+	"SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = 'tap_sub2' AND state = 'streaming';"
+  )
+  or die
+  "Timed out while waiting for apply to restart after changing PUBLICATION";
+
 $node_publisher->safe_psql('postgres',
 	"INSERT INTO tab_ins SELECT generate_series(1001,1100)");
 $node_publisher->safe_psql('postgres', "DELETE FROM tab_rep");
 
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_ins2 SELECT generate_series(1001,1100)");
+
 # Restart the publisher and check the state of the subscriber which
 # should be in a streaming state after catching up.
 $node_publisher->stop('fast');
@@ -465,6 +516,11 @@ $result = $node_subscriber->safe_psql('postgres',
 is($result, qq(1152|1|1100),
 	'check replicated inserts after subscription publication change');
 
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM tab_ins2");
+is($result, qq(1152|1|1100),
+	'check replicated inserts after subscription publication change');
+
 $result = $node_subscriber->safe_psql('postgres',
 	"SELECT count(*), min(a), max(a) FROM tab_rep");
 is($result, qq(20|-20|-1),
@@ -533,6 +589,7 @@ $node_publisher->poll_query_until('postgres',
 
 # check all the cleanup
 $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_renamed");
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub2");
 
 $result = $node_subscriber->safe_psql('postgres',
 	"SELECT count(*) FROM pg_subscription");
-- 
2.34.1

Reply via email to