On Fri, 29 Sept 2023 at 04:55, Peter Smith <smithpb2...@gmail.com> wrote:
>
> Some minor review comments for v4-0001:
>
> ======
> src/backend/replication/logical/worker.c
>
> 1.
> + /*
> + * Exit if the owner of the subscription has changed from superuser to a
> + * non-superuser.
> + */
> + if (!newsub->isownersuperuser && MySubscription->isownersuperuser)
> + {
> + if (am_parallel_apply_worker())
> + ereport(LOG,
> + errmsg("logical replication parallel apply worker for subscription
> \"%s\" will stop because subscription owner has become non-superuser",
> +    MySubscription->name));
> + else
> + ereport(LOG,
> + errmsg("logical replication worker for subscription \"%s\" will
> restart because subscription owner has become non-superuser",
> +    MySubscription->name));
> +
> + apply_worker_exit();
> + }
> +
>
> /because subscription owner has become non-superuser/because the
> subscription owner has become a non-superuser/  (in 2 places)

Modified

> ======
> src/include/catalog/pg_subscription.h
>
> 2.
>   char    *origin; /* Only publish data originating from the
>   * specified origin */
> + bool isownersuperuser; /* Is subscription owner superuser? */
>  } Subscription;
>
> ~
>
> 2a.
> Would it be better to put this new field adjacent to the existing
> 'owner' field, since they kind of belong together?

Modified

> ~
>
> 2b.
> None of the other bool fields here has an 'is' prefix, so you could
> consider a shorter field name, like 'ownersuperuser' or
> 'superuserowner', etc.

Modified

The attached v5 version patch has the changes for the same.

Regards,
Vignesh
From da332a17f531597df2ded354e934e58fab679418 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignes...@gmail.com>
Date: Fri, 22 Sep 2023 15:12:23 +0530
Subject: [PATCH v5] Restart the apply worker if the subscription owner has
 changed from superuser to non-superuser.

Restart the apply worker if the subscription owner has changed from
superuser to non-superuser. This is required so that the subscription
connection string gets revalidated to identify cases where the
password option is not specified as part of the connection string for
non-superuser.
---
 src/backend/catalog/pg_subscription.c       |  3 +++
 src/backend/commands/subscriptioncmds.c     |  4 +--
 src/backend/replication/logical/tablesync.c |  3 +--
 src/backend/replication/logical/worker.c    | 30 ++++++++++++++++++---
 src/include/catalog/pg_subscription.h       |  1 +
 src/test/subscription/t/027_nosuperuser.pl  | 24 +++++++++++++++++
 6 files changed, 58 insertions(+), 7 deletions(-)

diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index d07f88ce28..47a341431c 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -108,6 +108,9 @@ GetSubscription(Oid subid, bool missing_ok)
 								   Anum_pg_subscription_suborigin);
 	sub->origin = TextDatumGetCString(datum);
 
+	/* Get superuser for subscription owner */
+	sub->ownersuperuser = superuser_arg(sub->owner);
+
 	ReleaseSysCache(tup);
 
 	return sub;
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 6fe111e98d..608459504a 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -869,7 +869,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 	load_file("libpqwalreceiver", false);
 
 	/* Try to connect to the publisher. */
-	must_use_password = !superuser_arg(sub->owner) && sub->passwordrequired;
+	must_use_password = !sub->ownersuperuser && sub->passwordrequired;
 	wrconn = walrcv_connect(sub->conninfo, true, must_use_password,
 							sub->name, &err);
 	if (!wrconn)
@@ -1249,7 +1249,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 			load_file("libpqwalreceiver", false);
 			/* Check the connection info string. */
 			walrcv_check_conninfo(stmt->conninfo,
-								  sub->passwordrequired && !superuser_arg(sub->owner));
+								  sub->passwordrequired && !sub->ownersuperuser);
 
 			values[Anum_pg_subscription_subconninfo - 1] =
 				CStringGetTextDatum(stmt->conninfo);
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index e2cee92cf2..d87ac474b1 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -1278,9 +1278,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 
 	/* Is the use of a password mandatory? */
 	must_use_password = MySubscription->passwordrequired &&
-		!superuser_arg(MySubscription->owner);
+		!MySubscription->ownersuperuser;
 
-	/* Note that the superuser_arg call can access the DB */
 	CommitTransactionCommand();
 
 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 597947410f..d056407419 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3966,6 +3966,24 @@ maybe_reread_subscription(void)
 		apply_worker_exit();
 	}
 
+	/*
+	 * Exit if the owner of the subscription has changed from superuser to a
+	 * non-superuser.
+	 */
+	if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
+	{
+		if (am_parallel_apply_worker())
+			ereport(LOG,
+					errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner has become a non-superuser",
+						   MySubscription->name));
+		else
+			ereport(LOG,
+					errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner has become a non-superuser",
+						   MySubscription->name));
+
+		apply_worker_exit();
+	}
+
 	/* Check for other changes that should never happen too. */
 	if (newsub->dbid != MySubscription->dbid)
 	{
@@ -4495,9 +4513,8 @@ run_apply_worker()
 
 	/* Is the use of a password mandatory? */
 	must_use_password = MySubscription->passwordrequired &&
-		!superuser_arg(MySubscription->owner);
+		!MySubscription->ownersuperuser;
 
-	/* Note that the superuser_arg call can access the DB */
 	CommitTransactionCommand();
 
 	LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
@@ -4621,11 +4638,18 @@ InitializeLogRepWorker(void)
 	SetConfigOption("synchronous_commit", MySubscription->synccommit,
 					PGC_BACKEND, PGC_S_OVERRIDE);
 
-	/* Keep us informed about subscription changes. */
+	/*
+	 * Keep us informed about subscription changes or pg_authid rows.
+	 * (superuser can become non-superuser.)
+	 */
 	CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
 								  subscription_change_cb,
 								  (Datum) 0);
 
+	CacheRegisterSyscacheCallback(AUTHOID,
+								  subscription_change_cb,
+								  (Datum) 0);
+
 	if (am_tablesync_worker())
 		ereport(LOG,
 				(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index be36c4a820..6352cf70d5 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -127,6 +127,7 @@ typedef struct Subscription
 								 * skipped */
 	char	   *name;			/* Name of the subscription */
 	Oid			owner;			/* Oid of the subscription owner */
+	bool		ownersuperuser; /* Is subscription owner superuser? */
 	bool		enabled;		/* Indicates if the subscription is enabled */
 	bool		binary;			/* Indicates if the subscription wants data in
 								 * binary format */
diff --git a/src/test/subscription/t/027_nosuperuser.pl b/src/test/subscription/t/027_nosuperuser.pl
index d7a7e3ef5b..7be6d296a3 100644
--- a/src/test/subscription/t/027_nosuperuser.pl
+++ b/src/test/subscription/t/027_nosuperuser.pl
@@ -104,6 +104,7 @@ for my $node ($node_publisher, $node_subscriber)
   CREATE ROLE regress_admin SUPERUSER LOGIN;
   CREATE ROLE regress_alice NOSUPERUSER LOGIN;
   GRANT CREATE ON DATABASE postgres TO regress_alice;
+  GRANT PG_CREATE_SUBSCRIPTION TO regress_alice;
   SET SESSION AUTHORIZATION regress_alice;
   CREATE SCHEMA alice;
   GRANT USAGE ON SCHEMA alice TO regress_admin;
@@ -303,4 +304,27 @@ GRANT SELECT ON alice.unpartitioned TO regress_alice;
 expect_replication("alice.unpartitioned", 3, 17, 21,
 	"restoring SELECT permission permits replication to continue");
 
+# The apply worker should get restarted after the superuser privileges are
+# revoked for subscription owner alice.
+grant_superuser("regress_alice");
+$node_subscriber->safe_psql(
+	'postgres', qq(
+SET SESSION AUTHORIZATION regress_alice;
+CREATE SUBSCRIPTION regression_sub CONNECTION '$publisher_connstr' PUBLICATION alice;
+));
+
+# Wait for initial sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher,
+	'regression_sub');
+
+# Check the subscriber log from now on.
+$offset = -s $node_subscriber->logfile;
+
+revoke_superuser("regress_alice");
+
+# After the user becomes non-superuser the apply worker should be restarted.
+$node_subscriber->wait_for_log(
+	qr/LOG: ( [A-Z0-9]+:)? logical replication worker for subscription \"regression_sub\" will restart because the subscription owner has become a non-superuser/,
+	$offset);
+
 done_testing();
-- 
2.34.1

From 90f8d6ff5549e0ccab8bfb0ea55b1eade23cf5a5 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignes...@gmail.com>
Date: Fri, 29 Sep 2023 11:01:18 +0530
Subject: [PATCH v5] Restart the apply worker if the subscription owner has 
 changed from superuser to non-superuser.

Restart the apply worker if the subscription owner has changed from
superuser to non-superuser. This is required so that the subscription
connection string gets revalidated to identify cases where the
password option is not specified as part of the connection string for
non-superuser.
---
 src/backend/catalog/pg_subscription.c       |  3 +++
 src/backend/commands/subscriptioncmds.c     |  4 +--
 src/backend/replication/logical/tablesync.c |  3 +--
 src/backend/replication/logical/worker.c    | 30 ++++++++++++++++++---
 src/include/catalog/pg_subscription.h       |  1 +
 src/test/subscription/t/027_nosuperuser.pl  | 24 +++++++++++++++++
 6 files changed, 58 insertions(+), 7 deletions(-)

diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index d07f88ce28..47a341431c 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -108,6 +108,9 @@ GetSubscription(Oid subid, bool missing_ok)
 								   Anum_pg_subscription_suborigin);
 	sub->origin = TextDatumGetCString(datum);
 
+	/* Get superuser for subscription owner */
+	sub->ownersuperuser = superuser_arg(sub->owner);
+
 	ReleaseSysCache(tup);
 
 	return sub;
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 6fe111e98d..608459504a 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -869,7 +869,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 	load_file("libpqwalreceiver", false);
 
 	/* Try to connect to the publisher. */
-	must_use_password = !superuser_arg(sub->owner) && sub->passwordrequired;
+	must_use_password = !sub->ownersuperuser && sub->passwordrequired;
 	wrconn = walrcv_connect(sub->conninfo, true, must_use_password,
 							sub->name, &err);
 	if (!wrconn)
@@ -1249,7 +1249,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 			load_file("libpqwalreceiver", false);
 			/* Check the connection info string. */
 			walrcv_check_conninfo(stmt->conninfo,
-								  sub->passwordrequired && !superuser_arg(sub->owner));
+								  sub->passwordrequired && !sub->ownersuperuser);
 
 			values[Anum_pg_subscription_subconninfo - 1] =
 				CStringGetTextDatum(stmt->conninfo);
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 6d461654ab..0e471a7698 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -1265,9 +1265,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 
 	/* Is the use of a password mandatory? */
 	must_use_password = MySubscription->passwordrequired &&
-		!superuser_arg(MySubscription->owner);
+		!MySubscription->ownersuperuser;
 
-	/* Note that the superuser_arg call can access the DB */
 	CommitTransactionCommand();
 
 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 832b1cf764..8120302b90 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3958,6 +3958,24 @@ maybe_reread_subscription(void)
 		apply_worker_exit();
 	}
 
+	/*
+	 * Exit if the owner of the subscription has changed from superuser to a
+	 * non-superuser.
+	 */
+	if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
+	{
+		if (am_parallel_apply_worker())
+			ereport(LOG,
+					errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner has become a non-superuser",
+						   MySubscription->name));
+		else
+			ereport(LOG,
+					errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner has become a non-superuser",
+						   MySubscription->name));
+
+		apply_worker_exit();
+	}
+
 	/* Check for other changes that should never happen too. */
 	if (newsub->dbid != MySubscription->dbid)
 	{
@@ -4500,11 +4518,18 @@ InitializeApplyWorker(void)
 	SetConfigOption("synchronous_commit", MySubscription->synccommit,
 					PGC_BACKEND, PGC_S_OVERRIDE);
 
-	/* Keep us informed about subscription changes. */
+	/*
+	 * Keep us informed about subscription changes or pg_authid rows.
+	 * (superuser can become non-superuser.)
+	 */
 	CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
 								  subscription_change_cb,
 								  (Datum) 0);
 
+	CacheRegisterSyscacheCallback(AUTHOID,
+								  subscription_change_cb,
+								  (Datum) 0);
+
 	if (am_tablesync_worker())
 		ereport(LOG,
 				(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
@@ -4602,9 +4627,8 @@ ApplyWorkerMain(Datum main_arg)
 
 		/* Is the use of a password mandatory? */
 		must_use_password = MySubscription->passwordrequired &&
-			!superuser_arg(MySubscription->owner);
+			!MySubscription->ownersuperuser;
 
-		/* Note that the superuser_arg call can access the DB */
 		CommitTransactionCommand();
 
 		LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 1d40eebc78..2baa441f3b 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -127,6 +127,7 @@ typedef struct Subscription
 								 * skipped */
 	char	   *name;			/* Name of the subscription */
 	Oid			owner;			/* Oid of the subscription owner */
+	bool		ownersuperuser; /* Is subscription owner superuser? */
 	bool		enabled;		/* Indicates if the subscription is enabled */
 	bool		binary;			/* Indicates if the subscription wants data in
 								 * binary format */
diff --git a/src/test/subscription/t/027_nosuperuser.pl b/src/test/subscription/t/027_nosuperuser.pl
index d7a7e3ef5b..7be6d296a3 100644
--- a/src/test/subscription/t/027_nosuperuser.pl
+++ b/src/test/subscription/t/027_nosuperuser.pl
@@ -104,6 +104,7 @@ for my $node ($node_publisher, $node_subscriber)
   CREATE ROLE regress_admin SUPERUSER LOGIN;
   CREATE ROLE regress_alice NOSUPERUSER LOGIN;
   GRANT CREATE ON DATABASE postgres TO regress_alice;
+  GRANT PG_CREATE_SUBSCRIPTION TO regress_alice;
   SET SESSION AUTHORIZATION regress_alice;
   CREATE SCHEMA alice;
   GRANT USAGE ON SCHEMA alice TO regress_admin;
@@ -303,4 +304,27 @@ GRANT SELECT ON alice.unpartitioned TO regress_alice;
 expect_replication("alice.unpartitioned", 3, 17, 21,
 	"restoring SELECT permission permits replication to continue");
 
+# The apply worker should get restarted after the superuser privileges are
+# revoked for subscription owner alice.
+grant_superuser("regress_alice");
+$node_subscriber->safe_psql(
+	'postgres', qq(
+SET SESSION AUTHORIZATION regress_alice;
+CREATE SUBSCRIPTION regression_sub CONNECTION '$publisher_connstr' PUBLICATION alice;
+));
+
+# Wait for initial sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher,
+	'regression_sub');
+
+# Check the subscriber log from now on.
+$offset = -s $node_subscriber->logfile;
+
+revoke_superuser("regress_alice");
+
+# After the user becomes non-superuser the apply worker should be restarted.
+$node_subscriber->wait_for_log(
+	qr/LOG: ( [A-Z0-9]+:)? logical replication worker for subscription \"regression_sub\" will restart because the subscription owner has become a non-superuser/,
+	$offset);
+
 done_testing();
-- 
2.34.1

Reply via email to