From 2c1eccd20b55bbe4be5f4db2e69fe2e807c9c4cb Mon Sep 17 00:00:00 2001
From: Robert Haas <rhaas@postgresql.org>
Date: Mon, 30 Jan 2023 14:53:44 -0500
Subject: [PATCH v3] Add new predefined role pg_create_subscriptions.

This role can be granted to non-superusers to allow them to issue
CREATE SUBSCRIPTION. Non-superusers are required to specify a password
for authentication and the remote side must use the password, similar
to what is required for postgres_fdw and dblink.

A superuser who wants a non-superuser to own a subscription that
does not rely on password authentication may set the new
password_required=false property on that subscription. A non-superuser
may not modify a subscription with password_required=false, nor may a
non-superuser set this property to false. This works much like the
existing password_required property for postgres_fdw. In both cases,
the actual semantics are that a password is not required if either
(1) the property is set to false or (2) the relevant user is the
superuser.

This commit also allows non-superusers to use ALTER SUBSCRIPTION
.. SKIP. It is good enough to be the owner of the subscription.

XXX. pg_dump changes not included.
XXX. Documentation changes not included.
XXX. Catversion bump required.
---
 src/backend/catalog/pg_subscription.c         |   1 +
 src/backend/catalog/system_views.sql          |   1 +
 src/backend/commands/subscriptioncmds.c       | 132 +++++++++++++++---
 .../libpqwalreceiver/libpqwalreceiver.c       |  52 ++++++-
 src/backend/replication/logical/tablesync.c   |   9 +-
 src/backend/replication/logical/worker.c      |   6 +
 src/backend/replication/walreceiver.c         |   2 +-
 src/include/catalog/pg_authid.dat             |   5 +
 src/include/catalog/pg_subscription.h         |   3 +
 src/include/replication/walreceiver.h         |  12 +-
 src/test/regress/expected/subscription.out    |   4 +-
 src/test/subscription/t/027_nosuperuser.pl    |   2 +-
 12 files changed, 189 insertions(+), 40 deletions(-)

diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index a56ae311c3..087eae0bcb 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -71,6 +71,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->stream = subform->substream;
 	sub->twophasestate = subform->subtwophasestate;
 	sub->disableonerr = subform->subdisableonerr;
+	sub->passwordrequired = subform->subpasswordrequired;
 
 	/* Get conninfo */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 8608e3fa5b..8465a88a18 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1301,6 +1301,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
 REVOKE ALL ON pg_subscription FROM public;
 GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled,
               subbinary, substream, subtwophasestate, subdisableonerr,
+			  subpasswordrequired,
               subslotname, subsynccommit, subpublications, suborigin)
     ON pg_subscription TO public;
 
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 464db6d247..d00266f73a 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -23,9 +23,12 @@
 #include "catalog/namespace.h"
 #include "catalog/objectaccess.h"
 #include "catalog/objectaddress.h"
+#include "catalog/pg_authid_d.h"
+#include "catalog/pg_database_d.h"
 #include "catalog/pg_subscription.h"
 #include "catalog/pg_subscription_rel.h"
 #include "catalog/pg_type.h"
+#include "commands/dbcommands.h"
 #include "commands/defrem.h"
 #include "commands/event_trigger.h"
 #include "commands/subscriptioncmds.h"
@@ -64,8 +67,9 @@
 #define SUBOPT_STREAMING			0x00000100
 #define SUBOPT_TWOPHASE_COMMIT		0x00000200
 #define SUBOPT_DISABLE_ON_ERR		0x00000400
-#define SUBOPT_LSN					0x00000800
-#define SUBOPT_ORIGIN				0x00001000
+#define SUBOPT_PASSWORD_REQUIRED	0x00000800
+#define SUBOPT_LSN					0x00001000
+#define SUBOPT_ORIGIN				0x00002000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -88,6 +92,7 @@ typedef struct SubOpts
 	char		streaming;
 	bool		twophase;
 	bool		disableonerr;
+	bool		passwordrequired;
 	char	   *origin;
 	XLogRecPtr	lsn;
 } SubOpts;
@@ -144,6 +149,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->twophase = false;
 	if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
 		opts->disableonerr = false;
+	if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED))
+		opts->passwordrequired = true;
 	if (IsSet(supported_opts, SUBOPT_ORIGIN))
 		opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
 
@@ -274,6 +281,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
 			opts->disableonerr = defGetBoolean(defel);
 		}
+		else if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED) &&
+				 strcmp(defel->defname, "password_required") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_PASSWORD_REQUIRED))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_PASSWORD_REQUIRED;
+			opts->passwordrequired = defGetBoolean(defel);
+		}
 		else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
 				 strcmp(defel->defname, "origin") == 0)
 		{
@@ -550,6 +566,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	List	   *publications;
 	bits32		supported_opts;
 	SubOpts		opts = {0};
+	AclResult	aclresult;
 
 	/*
 	 * Parse and check options.
@@ -560,7 +577,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					  SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
 					  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
-					  SUBOPT_DISABLE_ON_ERR | SUBOPT_ORIGIN);
+					  SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
+					  SUBOPT_ORIGIN);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -572,10 +590,36 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	if (opts.create_slot)
 		PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
 
-	if (!superuser())
+	/*
+	 * We don't want to allow unprivileged users to be able to trigger attempts
+	 * to access arbitrary network destinations, so require the user to have
+	 * been specifically authorized to create subscriptions.
+	 */
+	if (!has_privs_of_role(owner, ROLE_PG_CREATE_SUBSCRIPTION))
 		ereport(ERROR,
 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
-				 errmsg("must be superuser to create subscriptions")));
+				 errmsg("must have privileges of pg_create_subscription to create subscriptions")));
+
+	/*
+	 * Since a subscription is a database object, we also check for CREATE
+	 * permission on the database.
+	 */
+	aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
+								owner, ACL_CREATE);
+	if (aclresult != ACLCHECK_OK)
+		aclcheck_error(aclresult, OBJECT_DATABASE,
+					   get_database_name(MyDatabaseId));
+
+	/*
+	 * Non-superusers are required to set a password for authentication, and
+	 * that password must be used by the target server, but the superuser can
+	 * exempt a subscription from this requirement.
+	 */
+	if (!opts.passwordrequired && !superuser_arg(owner))
+			ereport(ERROR,
+					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+					 errmsg("password_required=false is superuser-only"),
+					 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
 
 	/*
 	 * If built with appropriate switch, whine when regression-testing
@@ -614,7 +658,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	load_file("libpqwalreceiver", false);
 
 	/* Check the connection info string. */
-	walrcv_check_conninfo(conninfo);
+	walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
 
 	/* Everything ok, form a new tuple. */
 	memset(values, 0, sizeof(values));
@@ -636,6 +680,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					 LOGICALREP_TWOPHASE_STATE_PENDING :
 					 LOGICALREP_TWOPHASE_STATE_DISABLED);
 	values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
+	values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
 	values[Anum_pg_subscription_subconninfo - 1] =
 		CStringGetTextDatum(conninfo);
 	if (opts.slot_name)
@@ -672,9 +717,12 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 		List	   *tables;
 		ListCell   *lc;
 		char		table_state;
+		bool		must_use_password;
 
 		/* Try to connect to the publisher. */
-		wrconn = walrcv_connect(conninfo, true, stmt->subname, &err);
+		must_use_password = !superuser_arg(owner) && opts.passwordrequired;
+		wrconn = walrcv_connect(conninfo, true, must_use_password,
+								stmt->subname, &err);
 		if (!wrconn)
 			ereport(ERROR,
 					(errcode(ERRCODE_CONNECTION_FAILURE),
@@ -799,12 +847,15 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 	} SubRemoveRels;
 	SubRemoveRels *sub_remove_rels;
 	WalReceiverConn *wrconn;
+	bool		must_use_password;
 
 	/* Load the library providing us libpq calls. */
 	load_file("libpqwalreceiver", false);
 
 	/* Try to connect to the publisher. */
-	wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
+	must_use_password = !superuser_arg(sub->owner) && sub->passwordrequired;
+	wrconn = walrcv_connect(sub->conninfo, true, must_use_password,
+							sub->name, &err);
 	if (!wrconn)
 		ereport(ERROR,
 				(errcode(ERRCODE_CONNECTION_FAILURE),
@@ -1039,6 +1090,16 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 	sub = GetSubscription(subid, false);
 
+	/*
+	 * Don't allow non-superuser modification of a subscription with
+	 * password_required=false.
+	 */
+	if (!sub->passwordrequired && !superuser())
+		ereport(ERROR,
+				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+						 errmsg("password_required=false is superuser-only"),
+						 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
+
 	/* Lock the subscription so nobody else can do anything with it. */
 	LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
 
@@ -1054,7 +1115,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				supported_opts = (SUBOPT_SLOT_NAME |
 								  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 								  SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
-								  SUBOPT_ORIGIN);
+								  SUBOPT_PASSWORD_REQUIRED | SUBOPT_ORIGIN);
 
 				parse_subscription_options(pstate, stmt->options,
 										   supported_opts, &opts);
@@ -1111,6 +1172,21 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 						= true;
 				}
 
+				if (IsSet(opts.specified_opts, SUBOPT_PASSWORD_REQUIRED))
+				{
+					/* Non-superuser may not disable password_required. */
+					if (!opts.passwordrequired && !superuser())
+						ereport(ERROR,
+								(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+								 errmsg("password_required=false is superuser-only"),
+								 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
+
+					values[Anum_pg_subscription_subpasswordrequired - 1]
+						= BoolGetDatum(opts.passwordrequired);
+					replaces[Anum_pg_subscription_subpasswordrequired - 1]
+						= true;
+				}
+
 				if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
 				{
 					values[Anum_pg_subscription_suborigin - 1] =
@@ -1148,7 +1224,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 			/* Load the library providing us libpq calls. */
 			load_file("libpqwalreceiver", false);
 			/* Check the connection info string. */
-			walrcv_check_conninfo(stmt->conninfo);
+			walrcv_check_conninfo(stmt->conninfo,
+								  sub->passwordrequired && !superuser_arg(sub->owner));
 
 			values[Anum_pg_subscription_subconninfo - 1] =
 				CStringGetTextDatum(stmt->conninfo);
@@ -1305,11 +1382,6 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				/* ALTER SUBSCRIPTION ... SKIP supports only LSN option */
 				Assert(IsSet(opts.specified_opts, SUBOPT_LSN));
 
-				if (!superuser())
-					ereport(ERROR,
-							(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
-							 errmsg("must be superuser to skip transaction")));
-
 				/*
 				 * If the user sets subskiplsn, we do a sanity check to make
 				 * sure that the specified LSN is a probable value.
@@ -1379,6 +1451,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	ObjectAddress myself;
 	HeapTuple	tup;
 	Oid			subid;
+	Oid			subowner;
 	Datum		datum;
 	bool		isnull;
 	char	   *subname;
@@ -1391,6 +1464,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	WalReceiverConn *wrconn;
 	Form_pg_subscription form;
 	List	   *rstates;
+	bool		must_use_password;
 
 	/*
 	 * Lock pg_subscription with AccessExclusiveLock to ensure that the
@@ -1420,6 +1494,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 
 	form = (Form_pg_subscription) GETSTRUCT(tup);
 	subid = form->oid;
+	subowner = form->subowner;
+	must_use_password = !superuser_arg(subowner) && form->subpasswordrequired;
 
 	/* must be owner */
 	if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
@@ -1578,7 +1654,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	 */
 	load_file("libpqwalreceiver", false);
 
-	wrconn = walrcv_connect(conninfo, true, subname, &err);
+	wrconn = walrcv_connect(conninfo, true, must_use_password,
+							subname, &err);
 	if (wrconn == NULL)
 	{
 		if (!slotname)
@@ -1717,6 +1794,7 @@ static void
 AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
 {
 	Form_pg_subscription form;
+	AclResult	aclresult;
 
 	form = (Form_pg_subscription) GETSTRUCT(tup);
 
@@ -1727,13 +1805,21 @@ AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
 		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
 					   NameStr(form->subname));
 
-	/* New owner must be a superuser */
-	if (!superuser_arg(newOwnerId))
-		ereport(ERROR,
-				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
-				 errmsg("permission denied to change owner of subscription \"%s\"",
-						NameStr(form->subname)),
-				 errhint("The owner of a subscription must be a superuser.")));
+	/* Must be able to become new owner */
+	check_can_set_role(GetUserId(), newOwnerId);
+
+	/*
+	 * current owner must have CREATE on database
+	 *
+	 * This is consistent with how ALTER SCHEMA ... OWNER TO works, but some
+	 * other object types behave differently (e.g. you can't give a table to
+	 * a user who lacks CREATE privileges on a schema).
+	 */
+	aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
+								newOwnerId, ACL_CREATE);
+	if (aclresult != ACLCHECK_OK)
+		aclcheck_error(aclresult, OBJECT_DATABASE,
+					   get_database_name(MyDatabaseId));
 
 	form->subowner = newOwnerId;
 	CatalogTupleUpdate(rel, &tup->t_self, tup);
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 560ec974fa..1809601b53 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -49,9 +49,10 @@ struct WalReceiverConn
 
 /* Prototypes for interface functions */
 static WalReceiverConn *libpqrcv_connect(const char *conninfo,
-										 bool logical, const char *appname,
-										 char **err);
-static void libpqrcv_check_conninfo(const char *conninfo);
+										 bool logical, bool must_use_password,
+										 const char *appname, char **err);
+static void libpqrcv_check_conninfo(const char *conninfo,
+									bool must_use_password);
 static char *libpqrcv_get_conninfo(WalReceiverConn *conn);
 static void libpqrcv_get_senderinfo(WalReceiverConn *conn,
 									char **sender_host, int *sender_port);
@@ -122,8 +123,8 @@ _PG_init(void)
  * Returns NULL on error and fills the err with palloc'ed error message.
  */
 static WalReceiverConn *
-libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
-				 char **err)
+libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password,
+				 const char *appname, char **err)
 {
 	WalReceiverConn *conn;
 	const char *keys[6];
@@ -180,6 +181,13 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
 	if (PQstatus(conn->streamConn) != CONNECTION_OK)
 		goto bad_connection_errmsg;
 
+	if (must_use_password && !PQconnectionUsedPassword(conn->streamConn))
+		ereport(ERROR,
+				(errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
+				 errmsg("password is required"),
+				 errdetail("Non-superuser cannot connect if the server does not request a password."),
+				 errhint("Target server's authentication method must be changed. or set password_required=false in the subscription attributes.")));
+
 	if (logical)
 	{
 		PGresult   *res;
@@ -212,12 +220,18 @@ bad_connection:
 }
 
 /*
- * Validate connection info string (just try to parse it)
+ * Validate connection info string, and determine whether it might cause
+ * local filesystem access to be attempted.
+ *
+ * If the connection string can't be parsed, this function will raise
+ * an error and will not return. If it can, it will return true if this
+ * connection string specifies a password and false otherwise.
  */
 static void
-libpqrcv_check_conninfo(const char *conninfo)
+libpqrcv_check_conninfo(const char *conninfo, bool must_use_password)
 {
 	PQconninfoOption *opts = NULL;
+	PQconninfoOption *opt;
 	char	   *err = NULL;
 
 	opts = PQconninfoParse(conninfo, &err);
@@ -232,6 +246,30 @@ libpqrcv_check_conninfo(const char *conninfo)
 				 errmsg("invalid connection string syntax: %s", errcopy)));
 	}
 
+	if (must_use_password)
+	{
+		bool	uses_password = false;
+
+		for (opt = opts; opt->keyword != NULL; ++opt)
+		{
+			/* Ignore connection options that are not present. */
+			if (opt->val == NULL)
+				continue;
+
+			if (strcmp(opt->keyword, "password") == 0 && opt->val[0] != '\0')
+			{
+				uses_password = true;
+				break;
+			}
+		}
+
+		if (!uses_password)
+			ereport(ERROR,
+					(errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
+					 errmsg("password is required"),
+					 errdetail("Non-superusers must provide a password in the connection string.")));
+	}
+
 	PQconninfoFree(opts);
 }
 
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 07eea504ba..57e4836927 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -1237,6 +1237,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	WalRcvExecResult *res;
 	char		originname[NAMEDATALEN];
 	RepOriginId originid;
+	bool		must_use_password;
 
 	/* Check the state of the table synchronization. */
 	StartTransactionCommand();
@@ -1269,13 +1270,19 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 									slotname,
 									NAMEDATALEN);
 
+	/* Is the use of a password mandatory? */
+	must_use_password = MySubscription->passwordrequired &&
+		!superuser_arg(MySubscription->owner);
+
 	/*
 	 * Here we use the slot name instead of the subscription name as the
 	 * application_name, so that it is different from the leader apply worker,
 	 * so that synchronous replication can distinguish them.
 	 */
 	LogRepWorkerWalRcvConn =
-		walrcv_connect(MySubscription->conninfo, true, slotname, &err);
+		walrcv_connect(MySubscription->conninfo, true,
+					   must_use_password,
+					   slotname, &err);
 	if (LogRepWorkerWalRcvConn == NULL)
 		ereport(ERROR,
 				(errcode(ERRCODE_CONNECTION_FAILURE),
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index cfb2ab6248..f7128f46fc 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4529,6 +4529,7 @@ ApplyWorkerMain(Datum main_arg)
 		RepOriginId originid;
 		TimeLineID	startpointTLI;
 		char	   *err;
+		bool		must_use_password;
 
 		myslotname = MySubscription->slotname;
 
@@ -4554,7 +4555,12 @@ ApplyWorkerMain(Datum main_arg)
 		origin_startpos = replorigin_session_get_progress(false);
 		CommitTransactionCommand();
 
+		/* Is the use of a password mandatory? */
+		must_use_password = MySubscription->passwordrequired &&
+			!superuser_arg(MySubscription->owner);
+
 		LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
+												must_use_password,
 												MySubscription->name, &err);
 		if (LogRepWorkerWalRcvConn == NULL)
 			ereport(ERROR,
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index b0cfddd548..3c844234d3 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -296,7 +296,7 @@ WalReceiverMain(void)
 	PG_SETMASK(&UnBlockSig);
 
 	/* Establish the connection to the primary for XLOG streaming */
-	wrconn = walrcv_connect(conninfo, false,
+	wrconn = walrcv_connect(conninfo, false, false,
 							cluster_name[0] ? cluster_name : "walreceiver",
 							&err);
 	if (!wrconn)
diff --git a/src/include/catalog/pg_authid.dat b/src/include/catalog/pg_authid.dat
index f2e5663c9f..8920f3027e 100644
--- a/src/include/catalog/pg_authid.dat
+++ b/src/include/catalog/pg_authid.dat
@@ -94,5 +94,10 @@
   rolcreaterole => 'f', rolcreatedb => 'f', rolcanlogin => 'f',
   rolreplication => 'f', rolbypassrls => 'f', rolconnlimit => '-1',
   rolpassword => '_null_', rolvaliduntil => '_null_' },
+{ oid => '9535', oid_symbol => 'ROLE_PG_CREATE_SUBSCRIPTION',
+  rolname => 'pg_create_subscription', rolsuper => 'f', rolinherit => 't',
+  rolcreaterole => 'f', rolcreatedb => 'f', rolcanlogin => 'f',
+  rolreplication => 'f', rolbypassrls => 'f', rolconnlimit => '-1',
+  rolpassword => '_null_', rolvaliduntil => '_null_' },
 
 ]
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index b0f2a1705d..6319f598d8 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -88,6 +88,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 	bool		subdisableonerr;	/* True if a worker error should cause the
 									 * subscription to be disabled */
 
+	bool		subpasswordrequired; /* Must connection use a password? */
+
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* Connection string to the publisher */
 	text		subconninfo BKI_FORCE_NOT_NULL;
@@ -131,6 +133,7 @@ typedef struct Subscription
 	bool		disableonerr;	/* Indicates if the subscription should be
 								 * automatically disabled if a worker error
 								 * occurs */
+	bool		passwordrequired;	/* Must connection use a password? */
 	char	   *conninfo;		/* Connection string to the publisher */
 	char	   *slotname;		/* Name of the replication slot */
 	char	   *synccommit;		/* Synchronous commit setting for worker */
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index decffe352d..281626fa6f 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -239,6 +239,7 @@ typedef struct WalRcvExecResult
  */
 typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo,
 											   bool logical,
+											   bool must_use_password,
 											   const char *appname,
 											   char **err);
 
@@ -247,7 +248,8 @@ typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo,
  *
  * Parse and validate the connection string given as of 'conninfo'.
  */
-typedef void (*walrcv_check_conninfo_fn) (const char *conninfo);
+typedef void (*walrcv_check_conninfo_fn) (const char *conninfo,
+										  bool must_use_password);
 
 /*
  * walrcv_get_conninfo_fn
@@ -405,10 +407,10 @@ typedef struct WalReceiverFunctionsType
 
 extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
 
-#define walrcv_connect(conninfo, logical, appname, err) \
-	WalReceiverFunctions->walrcv_connect(conninfo, logical, appname, err)
-#define walrcv_check_conninfo(conninfo) \
-	WalReceiverFunctions->walrcv_check_conninfo(conninfo)
+#define walrcv_connect(conninfo, logical, must_use_password, appname, err) \
+	WalReceiverFunctions->walrcv_connect(conninfo, logical, must_use_password, appname, err)
+#define walrcv_check_conninfo(conninfo, must_use_password) \
+	WalReceiverFunctions->walrcv_check_conninfo(conninfo, must_use_password)
 #define walrcv_get_conninfo(conn) \
 	WalReceiverFunctions->walrcv_get_conninfo(conn)
 #define walrcv_get_senderinfo(conn, sender_host, sender_port) \
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 3f99b14394..582652f422 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -78,7 +78,7 @@ ERROR:  subscription "regress_testsub" already exists
 -- fail - must be superuser
 SET SESSION AUTHORIZATION 'regress_subscription_user2';
 CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION foo WITH (connect = false);
-ERROR:  must be superuser to create subscriptions
+ERROR:  must have privileges of pg_create_subscription to create subscriptions
 SET SESSION AUTHORIZATION 'regress_subscription_user';
 -- fail - invalid option combinations
 CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = true);
@@ -221,7 +221,7 @@ ALTER SUBSCRIPTION regress_testsub_foo RENAME TO regress_testsub;
 -- fail - new owner must be superuser
 ALTER SUBSCRIPTION regress_testsub OWNER TO regress_subscription_user2;
 ERROR:  permission denied to change owner of subscription "regress_testsub"
-HINT:  The owner of a subscription must be a superuser.
+HINT:  The owner of a subscription must have the privileges of pg_create_subscription.
 ALTER ROLE regress_subscription_user2 SUPERUSER;
 -- now it works
 ALTER SUBSCRIPTION regress_testsub OWNER TO regress_subscription_user2;
diff --git a/src/test/subscription/t/027_nosuperuser.pl b/src/test/subscription/t/027_nosuperuser.pl
index 59192dbe2f..e770e0615c 100644
--- a/src/test/subscription/t/027_nosuperuser.pl
+++ b/src/test/subscription/t/027_nosuperuser.pl
@@ -150,7 +150,7 @@ CREATE PUBLICATION alice
 $node_subscriber->safe_psql(
 	'postgres', qq(
 SET SESSION AUTHORIZATION regress_admin;
-CREATE SUBSCRIPTION admin_sub CONNECTION '$publisher_connstr' PUBLICATION alice;
+CREATE SUBSCRIPTION admin_sub CONNECTION '$publisher_connstr' PUBLICATION alice WITH (password_required=false);
 ));
 
 # Wait for initial sync to finish
-- 
2.37.1 (Apple Git-137.1)

