From cc8e820799d4423f329346e43b82e3bf0216842c Mon Sep 17 00:00:00 2001
From: Mark Dilger <mark.dilger@enterprisedb.com>
Date: Tue, 2 Nov 2021 15:08:54 -0700
Subject: [PATCH v2 3/3] Respect permissions within logical replication

Prevent logical replication workers from performing insert, update,
delete, truncate, or copy commands on tables unless the subscription
owner has permission to do so.  This makes it much safer to allow
non-superusers to create subscriptions, since they can only cause
changes to replicate into schemas and tables that they would be able
to directly make themselves.  This also reduces the amount of trust
that a subscriber must have in a publisher; if the subscription is
set up by a DBA using an account that is intentionally restricted to
only those tables or schemas that the DBA expects the publication to
touch, then the DBA can be create and refresh the subscription
without fear that a malicious publisher will have added other
critical tables to the publication and thereby overwrite them on the
subscriber side.  In many setups this is likely a non-issue, as the
same DBA manages both the publisher and the subscriber, but it seems
wiser not to hard-code that assumption into the security model of
logical replication.
---
 doc/src/sgml/logical-replication.sgml       |  35 ++++--
 doc/src/sgml/ref/alter_subscription.sgml    |   7 +-
 src/backend/commands/subscriptioncmds.c     |   2 +
 src/backend/replication/logical/tablesync.c |  13 +++
 src/backend/replication/logical/worker.c    |  30 +++++
 src/test/subscription/t/026_nosuperuser.pl  | 118 ++++++++++++++++++++
 6 files changed, 194 insertions(+), 11 deletions(-)
 create mode 100644 src/test/subscription/t/026_nosuperuser.pl

diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 45b2e1e28f..5968958f03 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -330,6 +330,15 @@
    will simply be skipped.
   </para>
 
+  <para>
+   Logical replication operations are performed with the privileges of the role
+   which owns the subscription.  Only superusers may create subscriptions, but
+   if the ownership of the subscription is transferred, or if the owner
+   subsequently has superuser privileges revoked, permissions failures can
+   cause conflicts.  (Note that <productname>PostgreSQL</productname> prior to
+   version 15.0 did no permissions checking when applying changes.)
+  </para>
+
   <para>
    A conflict will produce an error and will stop the replication; it must be
    resolved manually by the user.  Details about the conflict can be found in
@@ -337,7 +346,7 @@
   </para>
 
   <para>
-   The resolution can be done either by changing data on the subscriber so
+   The resolution can be done either by changing data or permissions on the subscriber so
    that it does not conflict with the incoming change or by skipping the
    transaction that conflicts with the existing data.  The transaction can be
    skipped by calling the <link linkend="pg-replication-origin-advance">
@@ -530,9 +539,9 @@
 
   <para>
    A user able to modify the schema of subscriber-side tables can execute
-   arbitrary code as a superuser.  Limit ownership
-   and <literal>TRIGGER</literal> privilege on such tables to roles that
-   superusers trust.  Moreover, if untrusted users can create tables, use only
+   arbitrary code as the role which owns any subscription which modifies those tables.  Limit ownership
+   and <literal>TRIGGER</literal> privilege on such tables to trusted roles.
+   Moreover, if untrusted users can create tables, use only
    publications that list tables explicitly.  That is to say, create a
    subscription <literal>FOR ALL TABLES</literal> or
    <literal>FOR ALL TABLES IN SCHEMA</literal> only when superusers trust
@@ -571,18 +580,26 @@
   </para>
 
   <para>
-   To create a subscription, the user must be a superuser.
+   To create a subscription, the user must be a superuser.  The ownership can
+   subsequently be transferred or the role have superuser privilege removed.
   </para>
 
   <para>
    The subscription apply process will run in the local database with the
-   privileges of a superuser.
+   privileges of the subscription owner.
+  </para>
+
+  <para>
+   On the publisher, privileges are only checked once at the start of a
+   replication connection and are not re-checked as each change record is read.
   </para>
 
   <para>
-   Privileges are only checked once at the start of a replication connection.
-   They are not re-checked as each change record is read from the publisher,
-   nor are they re-checked for each change when applied.
+   On the subscriber, the subscription owner's privileges are re-checked for
+   each change record when applied. If a worker is in the process of applying a
+   change record when the ownership of the subscription is changed by a
+   concurrent transaction, the application of the change record may finish
+   under the old permissions.
   </para>
  </sect1>
 
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index bc52339eba..41bf16e740 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -45,8 +45,11 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
 
   <para>
    You must own the subscription to use <command>ALTER SUBSCRIPTION</command>.
-   To alter the owner, you must also be a direct or indirect member of the
-   new owning role.
+   To alter the owner, you must also be a direct or indirect member of the new
+   owning role. Replicated changes will be applied to the target relations as
+   the subscription owner.  If the subscription owner lacks sufficient
+   privileges on the target relations, the replication workers will fail to
+   apply the changes.
   </para>
 
   <para>
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index d75183cd12..0b17a582dd 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1489,6 +1489,8 @@ AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
 
 	InvokeObjectPostAlterHook(SubscriptionRelationId,
 							  form->oid, 0);
+
+	ApplyLauncherWakeupAtCommit();
 }
 
 /*
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index f07983a43c..2400ef8c45 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -111,6 +111,7 @@
 #include "replication/origin.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
+#include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
@@ -924,6 +925,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	char		relstate;
 	XLogRecPtr	relstate_lsn;
 	Relation	rel;
+	AclResult	aclresult;
 	WalRcvExecResult *res;
 	char		originname[NAMEDATALEN];
 	RepOriginId originid;
@@ -1042,6 +1044,17 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	 */
 	rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
 
+	/*
+	 * Check that our table sync worker has permission to insert into the
+	 * target table.
+	 */
+	aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
+								  ACL_INSERT);
+	if (aclresult != ACLCHECK_OK)
+		aclcheck_error(aclresult,
+					   get_relkind_objtype(rel->rd_rel->relkind),
+					   RelationGetRelationName(rel));
+
 	/*
 	 * Start a transaction in the remote node in REPEATABLE READ mode.  This
 	 * ensures that both the replication slot we create (see below) and the
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 8d96c926b4..6ae0b3fb21 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -179,6 +179,7 @@
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "tcop/tcopprot.h"
+#include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/catcache.h"
 #include "utils/dynahash.h"
@@ -1537,6 +1538,7 @@ apply_handle_insert(StringInfo s)
 	LogicalRepRelMapEntry *rel;
 	LogicalRepTupleData newtup;
 	LogicalRepRelId relid;
+	AclResult	aclresult;
 	ApplyExecutionData *edata;
 	EState	   *estate;
 	TupleTableSlot *remoteslot;
@@ -1559,6 +1561,12 @@ apply_handle_insert(StringInfo s)
 		end_replication_step();
 		return;
 	}
+	aclresult = pg_class_aclcheck(RelationGetRelid(rel->localrel), GetUserId(),
+								  ACL_INSERT);
+	if (aclresult != ACLCHECK_OK)
+		aclcheck_error(aclresult,
+					   get_relkind_objtype(rel->localrel->rd_rel->relkind),
+					   get_rel_name(rel->localreloid));
 
 	/* Set relation for error callback */
 	apply_error_callback_arg.rel = rel;
@@ -1659,6 +1667,7 @@ apply_handle_update(StringInfo s)
 {
 	LogicalRepRelMapEntry *rel;
 	LogicalRepRelId relid;
+	AclResult	aclresult;
 	ApplyExecutionData *edata;
 	EState	   *estate;
 	LogicalRepTupleData oldtup;
@@ -1686,6 +1695,12 @@ apply_handle_update(StringInfo s)
 		end_replication_step();
 		return;
 	}
+	aclresult = pg_class_aclcheck(RelationGetRelid(rel->localrel), GetUserId(),
+								  ACL_UPDATE);
+	if (aclresult != ACLCHECK_OK)
+		aclcheck_error(aclresult,
+					   get_relkind_objtype(rel->localrel->rd_rel->relkind),
+					   get_rel_name(rel->localreloid));
 
 	/* Set relation for error callback */
 	apply_error_callback_arg.rel = rel;
@@ -1826,6 +1841,7 @@ apply_handle_delete(StringInfo s)
 	LogicalRepRelMapEntry *rel;
 	LogicalRepTupleData oldtup;
 	LogicalRepRelId relid;
+	AclResult	aclresult;
 	ApplyExecutionData *edata;
 	EState	   *estate;
 	TupleTableSlot *remoteslot;
@@ -1848,6 +1864,12 @@ apply_handle_delete(StringInfo s)
 		end_replication_step();
 		return;
 	}
+	aclresult = pg_class_aclcheck(RelationGetRelid(rel->localrel), GetUserId(),
+								  ACL_DELETE);
+	if (aclresult != ACLCHECK_OK)
+		aclcheck_error(aclresult,
+					   get_relkind_objtype(rel->localrel->rd_rel->relkind),
+					   get_rel_name(rel->localreloid));
 
 	/* Set relation for error callback */
 	apply_error_callback_arg.rel = rel;
@@ -2220,6 +2242,7 @@ apply_handle_truncate(StringInfo s)
 	{
 		LogicalRepRelId relid = lfirst_oid(lc);
 		LogicalRepRelMapEntry *rel;
+		AclResult	aclresult;
 
 		rel = logicalrep_rel_open(relid, lockmode);
 		if (!should_apply_changes_for_rel(rel))
@@ -2231,6 +2254,12 @@ apply_handle_truncate(StringInfo s)
 			logicalrep_rel_close(rel, lockmode);
 			continue;
 		}
+		aclresult = pg_class_aclcheck(RelationGetRelid(rel->localrel),
+									  GetUserId(), ACL_TRUNCATE);
+		if (aclresult != ACLCHECK_OK)
+			aclcheck_error(aclresult,
+						   get_relkind_objtype(rel->localrel->rd_rel->relkind),
+						   get_rel_name(rel->localreloid));
 
 		remote_rels = lappend(remote_rels, rel);
 		rels = lappend(rels, rel->localrel);
@@ -2912,6 +2941,7 @@ maybe_reread_subscription(void)
 		strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
 		newsub->binary != MySubscription->binary ||
 		newsub->stream != MySubscription->stream ||
+		newsub->owner != MySubscription->owner ||
 		!equal(newsub->publications, MySubscription->publications))
 	{
 		ereport(LOG,
diff --git a/src/test/subscription/t/026_nosuperuser.pl b/src/test/subscription/t/026_nosuperuser.pl
new file mode 100644
index 0000000000..03a5ac8c52
--- /dev/null
+++ b/src/test/subscription/t/026_nosuperuser.pl
@@ -0,0 +1,118 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Test subscriptions owned by non-superusers
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More tests => 3;
+
+# Setup
+
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+
+# Create the same schema, roles and permissions on both nodes.
+my $sql = qq(
+CREATE ROLE regress_role_alice NOSUPERUSER NOLOGIN;
+GRANT CREATE ON DATABASE postgres TO regress_role_alice;
+SET ROLE regress_role_alice;
+CREATE SCHEMA alice;
+CREATE TABLE alice.tbl1 (i1 INTEGER);
+ALTER TABLE alice.tbl1 REPLICA IDENTITY FULL;
+RESET ROLE;
+
+CREATE ROLE regress_role_bob NOSUPERUSER NOLOGIN;
+GRANT CREATE ON DATABASE postgres TO regress_role_bob;
+SET ROLE regress_role_bob;
+CREATE SCHEMA bob;
+CREATE TABLE bob.tbl1 (i1 INTEGER);
+CREATE TABLE bob.tbl2 (i2 INTEGER);
+ALTER TABLE bob.tbl1 REPLICA IDENTITY FULL;
+ALTER TABLE bob.tbl2 REPLICA IDENTITY FULL;
+RESET ROLE;
+);
+$node_publisher->safe_psql('postgres', $sql);
+$node_subscriber->safe_psql('postgres', $sql);
+
+# Bob publishes.
+$node_publisher->safe_psql('postgres', qq(
+SET ROLE regress_role_bob;
+CREATE PUBLICATION bob_pub FOR TABLE bob.tbl1;
+INSERT INTO bob.tbl1 VALUES (1);
+));
+
+# Bob gets DBA to create a subscribe for him.
+$node_subscriber->safe_psql('postgres', qq(
+CREATE SUBSCRIPTION bob_sub CONNECTION '$publisher_connstr' PUBLICATION bob_pub;
+ALTER SUBSCRIPTION bob_sub OWNER TO regress_role_bob;
+));
+
+# Wait for initial sync of all subscriptions.
+my $synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Check that the data came through.
+my $result = $node_subscriber->safe_psql('postgres', qq(
+SET ROLE regress_role_bob;
+SELECT i1 FROM bob.tbl1;
+));
+is( $result, '1', 'data from schema bob was replicated');
+
+# A malicious dba on publisher adds alice's tables to publication "bob_pub".
+$node_publisher->safe_psql('postgres', qq(
+ALTER PUBLICATION bob_pub ADD TABLE alice.tbl1;
+INSERT INTO alice.tbl1 VALUES (1);
+));
+
+# Bob adds another table on publisher side to the publication and performs
+# DML on the new table.
+$node_publisher->safe_psql('postgres', qq(
+SET ROLE regress_role_bob;
+ALTER PUBLICATION bob_pub ADD TABLE bob.tbl2;
+INSERT INTO bob.tbl2 VALUES (2);
+));
+
+# Bob refreshes on subscriber side to get the new data from "bob.tbl2", not
+# knowing that alice's tables have been added.  An error during tablesync or
+# apply should prevent the data in schema "alice" being replicated.
+#
+$node_subscriber->psql('postgres', qq(
+SET ROLE regress_role_bob;
+ALTER SUBSCRIPTION bob_sub REFRESH PUBLICATION;
+));
+
+# Wait for sync of all subscriptions.
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# The data for schema "bob" might not be replicated, because the replication
+# may abort first on a permissions error for schema "alice".  We insist that
+# data for "alice" not be replicated, but we only insist that if data from
+# "bob" gets replicated that it be the correct new data and not corrupted.
+$result = $node_subscriber->safe_psql('postgres', qq(
+SELECT a.i1::text || '_' ||
+	   coalesce(b.i2::text,'null') || '_' ||
+	   coalesce(c.i1::text,'null')
+	FROM bob.tbl1 a
+	LEFT JOIN bob.tbl2 b ON TRUE
+	LEFT JOIN alice.tbl1 c ON TRUE;
+));
+like ($result, qr/^(?:1_null_null|1_2_null)$/,
+	'subscriber side schema bob data is reasonable and alice data is unreplicated');
+
+# check that the logs on the subscriber side contain the expected permissions
+# error associated with a failed attempt to replicate into schema "alice".
+my $subscriber_log = slurp_file($node_subscriber->logfile);
+like ($subscriber_log, qr/ERROR:  permission denied for schema alice/msi,
+	'subscriber lacks permission for schema alice');
-- 
2.21.1 (Apple Git-122.3)

