New patch set based on the discussions.  I have dropped the PUBLICATION
privilege patch.  The patches are also reordered a bit in approximate
decreasing priority order.

0001 Refine rules for altering publication owner

kind of a bug fix

0002 Change logical replication pg_hba.conf use

This was touched upon in the discussion at
<https://www.postgresql.org/message-id/flat/CAB7nPqRf8eOv15SPQJbC1npJoDWTNPMTNp6AvMN-XWwB53h2Cg%40mail.gmail.com>
and seems to have been viewed favorably there.

0003 Add USAGE privilege for publications

a way to control who can subscribe to a publication

0004 Add subscription apply worker privilege checks

This is a prerequisite for the next one (or one like it).

0005 Add CREATE SUBSCRIPTION privilege on databases

Need a way to determine which user can create subscriptions.  The
presented approach made sense to me, but maybe there are other ideas.

-- 
Peter Eisentraut              http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
From 676e6e26f1b0500907c0cd810969bb015ee548d1 Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <peter_e@gmx.net>
Date: Mon, 13 Feb 2017 08:57:45 -0500
Subject: [PATCH v2 1/5] Refine rules for altering publication owner

Previously, the new owner had to be a superuser.  The new rules are more
refined similar to other objects.
---
 doc/src/sgml/ref/alter_publication.sgml   |  7 +++++--
 src/backend/commands/publicationcmds.c    | 34 ++++++++++++++++++++++---------
 src/test/regress/expected/publication.out |  8 ++++++++
 src/test/regress/sql/publication.sql      |  4 ++++
 4 files changed, 41 insertions(+), 12 deletions(-)

diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml
index 47d83b80be..776661bbeb 100644
--- a/doc/src/sgml/ref/alter_publication.sgml
+++ b/doc/src/sgml/ref/alter_publication.sgml
@@ -48,8 +48,11 @@ <title>Description</title>
   </para>
 
   <para>
-   To alter the owner, you must also be a direct or indirect member of the
-   new owning role. The new owner has to be a superuser
+   To alter the owner, you must also be a direct or indirect member of the new
+   owning role. The new owner must have <literal>CREATE</literal> privilege on
+   the database.  Also, the new owner of a <literal>FOR ALL TABLES</literal>
+   publication must be a superuser.  However, a superuser can change the
+   ownership of a publication while circumventing these restrictions.
   </para>
 
   <para>
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 04f83e0a2e..d69e39dc5b 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -670,17 +670,31 @@ AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
 	if (form->pubowner == newOwnerId)
 		return;
 
-	if (!pg_publication_ownercheck(HeapTupleGetOid(tup), GetUserId()))
-		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_PUBLICATION,
-					   NameStr(form->pubname));
+	if (!superuser())
+	{
+		AclResult	aclresult;
 
-	/* New owner must be a superuser */
-	if (!superuser_arg(newOwnerId))
-		ereport(ERROR,
-				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
-				 errmsg("permission denied to change owner of publication \"%s\"",
-						NameStr(form->pubname)),
-				 errhint("The owner of a publication must be a superuser.")));
+		/* Must be owner */
+		if (!pg_publication_ownercheck(HeapTupleGetOid(tup), GetUserId()))
+			aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_PUBLICATION,
+						   NameStr(form->pubname));
+
+		/* Must be able to become new owner */
+		check_is_member_of_role(GetUserId(), newOwnerId);
+
+		/* New owner must have CREATE privilege on database */
+		aclresult = pg_database_aclcheck(MyDatabaseId, newOwnerId, ACL_CREATE);
+		if (aclresult != ACLCHECK_OK)
+			aclcheck_error(aclresult, ACL_KIND_DATABASE,
+						   get_database_name(MyDatabaseId));
+
+		if (form->puballtables && !superuser_arg(newOwnerId))
+			ereport(ERROR,
+					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+					 errmsg("permission denied to change owner of publication \"%s\"",
+							NameStr(form->pubname)),
+					 errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
+	}
 
 	form->pubowner = newOwnerId;
 	CatalogTupleUpdate(rel, &tup->t_self, tup);
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index 7c4834b213..5a7c0edf7d 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -182,6 +182,14 @@ ALTER PUBLICATION testpub_default RENAME TO testpub_foo;
 
 -- rename back to keep the rest simple
 ALTER PUBLICATION testpub_foo RENAME TO testpub_default;
+ALTER PUBLICATION testpub_default OWNER TO regress_publication_user2;
+\dRp testpub_default
+                           List of publications
+      Name       |           Owner           | Inserts | Updates | Deletes 
+-----------------+---------------------------+---------+---------+---------
+ testpub_default | regress_publication_user2 | t       | t       | t
+(1 row)
+
 DROP PUBLICATION testpub_default;
 DROP PUBLICATION testpib_ins_trunct;
 DROP PUBLICATION testpub_fortbl;
diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql
index 46d275acc5..cff9931a77 100644
--- a/src/test/regress/sql/publication.sql
+++ b/src/test/regress/sql/publication.sql
@@ -108,6 +108,10 @@ CREATE PUBLICATION testpub2;  -- ok
 -- rename back to keep the rest simple
 ALTER PUBLICATION testpub_foo RENAME TO testpub_default;
 
+ALTER PUBLICATION testpub_default OWNER TO regress_publication_user2;
+
+\dRp testpub_default
+
 DROP PUBLICATION testpub_default;
 DROP PUBLICATION testpib_ins_trunct;
 DROP PUBLICATION testpub_fortbl;
-- 
2.12.0

From 82ef38dbdb2118aaef15f9974167b5518707db2c Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <peter_e@gmx.net>
Date: Mon, 13 Feb 2017 16:50:29 -0500
Subject: [PATCH v2 2/5] Change logical replication pg_hba.conf use

Logical replication no longer uses the "replication" keyword.  It just
matches database entries in the normal way.  The "replication" keyword
now only applies to physical replication.
---
 doc/src/sgml/client-auth.sgml         | 2 +-
 doc/src/sgml/logical-replication.sgml | 8 +++-----
 src/backend/libpq/hba.c               | 4 ++--
 3 files changed, 6 insertions(+), 8 deletions(-)

diff --git a/doc/src/sgml/client-auth.sgml b/doc/src/sgml/client-auth.sgml
index bbd52a5418..d6b8c04edc 100644
--- a/doc/src/sgml/client-auth.sgml
+++ b/doc/src/sgml/client-auth.sgml
@@ -193,7 +193,7 @@ <title>The <filename>pg_hba.conf</filename> File</title>
        members of the role, directly or indirectly, and not just by
        virtue of being a superuser.
        The value <literal>replication</> specifies that the record
-       matches if a replication connection is requested (note that
+       matches if a physical replication connection is requested (note that
        replication connections do not specify any particular database).
        Otherwise, this is the name of
        a specific <productname>PostgreSQL</productname> database.
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index a6c04e923d..6da39d25e3 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -295,11 +295,9 @@ <title>Monitoring</title>
   <title>Security</title>
 
   <para>
-   Logical replication connections occur in the same way as with physical streaming
-   replication.  It requires access to be explicitly given using
-   <filename>pg_hba.conf</filename>.  The role used for the replication
-   connection must have the <literal>REPLICATION</literal> attribute.  This
-   gives a role access to both logical and physical replication.
+   The role used for the replication connection must have
+   the <literal>REPLICATION</literal> attribute.  Access for the role must be
+   configured in <filename>pg_hba.conf</filename>.
   </para>
 
   <para>
diff --git a/src/backend/libpq/hba.c b/src/backend/libpq/hba.c
index 3817d249c4..7abcae618d 100644
--- a/src/backend/libpq/hba.c
+++ b/src/backend/libpq/hba.c
@@ -612,9 +612,9 @@ check_db(const char *dbname, const char *role, Oid roleid, List *tokens)
 	foreach(cell, tokens)
 	{
 		tok = lfirst(cell);
-		if (am_walsender)
+		if (am_walsender && !am_db_walsender)
 		{
-			/* walsender connections can only match replication keyword */
+			/* physical replication walsender connections can only match replication keyword */
 			if (token_is_keyword(tok, "replication"))
 				return true;
 		}
-- 
2.12.0

From dd96a267f96e21ea390c5de46058c3158a5feef8 Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <peter_e@gmx.net>
Date: Wed, 2 Nov 2016 12:00:00 -0400
Subject: [PATCH v2 3/5] Add USAGE privilege for publications

Add pg_publication.pubacl column and associated infrastructure so that
publications can have privileges.  USAGE privilege on the publication is
now required for a connecting subscription to be able to use it.
Previously, any connecting user could use any publication, which was not
unreasonable because that user needs to have the REPLICATION attribute,
which is pretty powerful anyway, but we might want to move away from
that, and this gives finer control.
---
 doc/src/sgml/catalogs.sgml                  |  12 ++
 doc/src/sgml/func.sgml                      |  27 ++++
 doc/src/sgml/logical-replication.sgml       |   5 +
 doc/src/sgml/ref/grant.sgml                 |   8 ++
 doc/src/sgml/ref/revoke.sgml                |   6 +
 src/backend/catalog/aclchk.c                | 213 ++++++++++++++++++++++++++++
 src/backend/commands/event_trigger.c        |   1 +
 src/backend/commands/publicationcmds.c      |   2 +
 src/backend/parser/gram.y                   |   8 ++
 src/backend/replication/pgoutput/pgoutput.c |   8 ++
 src/backend/utils/adt/acl.c                 | 200 ++++++++++++++++++++++++++
 src/bin/pg_dump/dumputils.c                 |   2 +
 src/bin/pg_dump/pg_dump.c                   |  60 ++++++--
 src/bin/pg_dump/pg_dump.h                   |   4 +
 src/bin/psql/describe.c                     |   8 +-
 src/bin/psql/tab-complete.c                 |   1 +
 src/include/catalog/catversion.h            |   2 +-
 src/include/catalog/pg_proc.h               |  13 ++
 src/include/catalog/pg_publication.h        |   6 +-
 src/include/nodes/parsenodes.h              |   1 +
 src/include/utils/acl.h                     |   4 +
 src/test/regress/expected/publication.out   |  33 +++--
 src/test/regress/sql/publication.sql        |   4 +
 src/test/subscription/t/003_privileges.pl   |  72 ++++++++++
 24 files changed, 675 insertions(+), 25 deletions(-)
 create mode 100644 src/test/subscription/t/003_privileges.pl

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 2c2da2ad8a..5f5fdc31c0 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -5378,6 +5378,18 @@ <title><structname>pg_publication</structname> Columns</title>
       <entry>If true, <command>DELETE</command> operations are replicated for
        tables in the publication.</entry>
      </row>
+
+     <row>
+      <entry><structfield>pubacl</structfield></entry>
+      <entry><type>aclitem[]</type></entry>
+      <entry></entry>
+      <entry>
+       Access privileges; see
+       <xref linkend="sql-grant"> and
+       <xref linkend="sql-revoke">
+       for details
+      </entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index a521912317..6cb4cff199 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -16173,6 +16173,21 @@ <title>Access Privilege Inquiry Functions</title>
        <entry>does current user have privilege for language</entry>
       </row>
       <row>
+       <entry><literal><function>has_publication_privilege</function>(<parameter>user</parameter>,
+                                  <parameter>publication</parameter>,
+                                  <parameter>privilege</parameter>)</literal>
+       </entry>
+       <entry><type>boolean</type></entry>
+       <entry>does user have privilege for publication</entry>
+      </row>
+      <row>
+       <entry><literal><function>has_publication_privilege</function>(<parameter>publication</parameter>,
+                                  <parameter>privilege</parameter>)</literal>
+       </entry>
+       <entry><type>boolean</type></entry>
+       <entry>does current user have privilege for publication</entry>
+      </row>
+      <row>
        <entry><literal><function>has_schema_privilege</function>(<parameter>user</parameter>,
                                   <parameter>schema</parameter>,
                                   <parameter>privilege</parameter>)</literal>
@@ -16306,6 +16321,9 @@ <title>Access Privilege Inquiry Functions</title>
     <primary>has_language_privilege</primary>
    </indexterm>
    <indexterm>
+    <primary>has_publication_privilege</primary>
+   </indexterm>
+   <indexterm>
     <primary>has_schema_privilege</primary>
    </indexterm>
    <indexterm>
@@ -16450,6 +16468,15 @@ <title>Access Privilege Inquiry Functions</title>
    </para>
 
    <para>
+    <function>has_publication_privilege</function> checks whether a user
+    can access a publication in a particular way.
+    Its argument possibilities
+    are analogous to <function>has_table_privilege</function>.
+    The desired access privilege type must evaluate to
+    <literal>USAGE</literal>.
+   </para>
+
+   <para>
     <function>has_schema_privilege</function> checks whether a user
     can access a schema in a particular way.
     Its argument possibilities
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 6da39d25e3..4c8d454c9e 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -316,6 +316,11 @@ <title>Security</title>
   </para>
 
   <para>
+   To use a publication, the remote user of a subscription connection must
+   have the <literal>USAGE</literal> privilege on the publication.
+  </para>
+
+  <para>
    The subscription apply process will run in the local database with the
    privileges of a superuser.
   </para>
diff --git a/doc/src/sgml/ref/grant.sgml b/doc/src/sgml/ref/grant.sgml
index 9fb4c2fd7e..e1336e6857 100644
--- a/doc/src/sgml/ref/grant.sgml
+++ b/doc/src/sgml/ref/grant.sgml
@@ -67,6 +67,10 @@
     ON LARGE OBJECT <replaceable class="PARAMETER">loid</replaceable> [, ...]
     TO <replaceable class="PARAMETER">role_specification</replaceable> [, ...] [ WITH GRANT OPTION ]
 
+GRANT { USAGE | ALL [ PRIVILEGES ] }
+    ON PUBLICATION <replaceable>publication_name</replaceable> [, ...]
+    TO <replaceable class="PARAMETER">role_specification</replaceable> [, ...] [ WITH GRANT OPTION ]
+
 GRANT { { CREATE | USAGE } [, ...] | ALL [ PRIVILEGES ] }
     ON SCHEMA <replaceable>schema_name</replaceable> [, ...]
     TO <replaceable class="PARAMETER">role_specification</replaceable> [, ...] [ WITH GRANT OPTION ]
@@ -368,6 +372,10 @@ <title>GRANT on Database Objects</title>
        tables using the server, and also to create, alter, or drop their own
        user's user mappings associated with that server.
       </para>
+      <para>
+       For publications, this privilege allows a subscription to use the
+       publication.
+      </para>
      </listitem>
     </varlistentry>
 
diff --git a/doc/src/sgml/ref/revoke.sgml b/doc/src/sgml/ref/revoke.sgml
index ce532543f0..ba1a4e73a8 100644
--- a/doc/src/sgml/ref/revoke.sgml
+++ b/doc/src/sgml/ref/revoke.sgml
@@ -88,6 +88,12 @@
     [ CASCADE | RESTRICT ]
 
 REVOKE [ GRANT OPTION FOR ]
+    { USAGE | ALL [ PRIVILEGES ] }
+    ON PUBLICATION <replaceable>publication_name</replaceable> [, ...]
+    FROM { [ GROUP ] <replaceable class="PARAMETER">role_name</replaceable> | PUBLIC } [, ...]
+    [ CASCADE | RESTRICT ]
+
+REVOKE [ GRANT OPTION FOR ]
     { { CREATE | USAGE } [, ...] | ALL [ PRIVILEGES ] }
     ON SCHEMA <replaceable>schema_name</replaceable> [, ...]
     FROM { [ GROUP ] <replaceable class="PARAMETER">role_name</replaceable> | PUBLIC } [, ...]
diff --git a/src/backend/catalog/aclchk.c b/src/backend/catalog/aclchk.c
index be86d76a59..e029485c8e 100644
--- a/src/backend/catalog/aclchk.c
+++ b/src/backend/catalog/aclchk.c
@@ -109,6 +109,7 @@ static void ExecGrant_Function(InternalGrant *grantStmt);
 static void ExecGrant_Language(InternalGrant *grantStmt);
 static void ExecGrant_Largeobject(InternalGrant *grantStmt);
 static void ExecGrant_Namespace(InternalGrant *grantStmt);
+static void ExecGrant_Publication(InternalGrant *grantStmt);
 static void ExecGrant_Tablespace(InternalGrant *grantStmt);
 static void ExecGrant_Type(InternalGrant *grantStmt);
 
@@ -283,6 +284,9 @@ restrict_and_check_grant(bool is_grant, AclMode avail_goptions, bool all_privs,
 		case ACL_KIND_TYPE:
 			whole_mask = ACL_ALL_RIGHTS_TYPE;
 			break;
+		case ACL_KIND_PUBLICATION:
+			whole_mask = ACL_ALL_RIGHTS_PUBLICATION;
+			break;
 		default:
 			elog(ERROR, "unrecognized object kind: %d", objkind);
 			/* not reached, but keep compiler quiet */
@@ -497,6 +501,10 @@ ExecuteGrantStmt(GrantStmt *stmt)
 			all_privileges = ACL_ALL_RIGHTS_FOREIGN_SERVER;
 			errormsg = gettext_noop("invalid privilege type %s for foreign server");
 			break;
+		case ACL_OBJECT_PUBLICATION:
+			all_privileges = ACL_ALL_RIGHTS_PUBLICATION;
+			errormsg = gettext_noop("invalid privilege type %s for publication");
+			break;
 		default:
 			elog(ERROR, "unrecognized GrantStmt.objtype: %d",
 				 (int) stmt->objtype);
@@ -594,6 +602,9 @@ ExecGrantStmt_oids(InternalGrant *istmt)
 		case ACL_OBJECT_NAMESPACE:
 			ExecGrant_Namespace(istmt);
 			break;
+		case ACL_OBJECT_PUBLICATION:
+			ExecGrant_Publication(istmt);
+			break;
 		case ACL_OBJECT_TABLESPACE:
 			ExecGrant_Tablespace(istmt);
 			break;
@@ -736,6 +747,15 @@ objectNamesToOids(GrantObjectType objtype, List *objnames)
 				objects = lappend_oid(objects, srvid);
 			}
 			break;
+		case ACL_OBJECT_PUBLICATION:
+			foreach(cell, objnames)
+			{
+				char	   *pubname = strVal(lfirst(cell));
+				Oid			pubid = get_publication_oid(pubname, false);
+
+				objects = lappend_oid(objects, pubid);
+			}
+			break;
 		default:
 			elog(ERROR, "unrecognized GrantStmt.objtype: %d",
 				 (int) objtype);
@@ -2936,6 +2956,126 @@ ExecGrant_Namespace(InternalGrant *istmt)
 }
 
 static void
+ExecGrant_Publication(InternalGrant *istmt)
+{
+	Relation	relation;
+	ListCell   *cell;
+
+	if (istmt->all_privs && istmt->privileges == ACL_NO_RIGHTS)
+		istmt->privileges = ACL_ALL_RIGHTS_PUBLICATION;
+
+	relation = heap_open(PublicationRelationId, RowExclusiveLock);
+
+	foreach(cell, istmt->objects)
+	{
+		Oid			pubId = lfirst_oid(cell);
+		Form_pg_publication pg_publication_tuple;
+		Datum		aclDatum;
+		bool		isNull;
+		AclMode		avail_goptions;
+		AclMode		this_privileges;
+		Acl		   *old_acl;
+		Acl		   *new_acl;
+		Oid			grantorId;
+		Oid			ownerId;
+		HeapTuple	newtuple;
+		Datum		values[Natts_pg_publication];
+		bool		nulls[Natts_pg_publication];
+		bool		replaces[Natts_pg_publication];
+		int			noldmembers;
+		int			nnewmembers;
+		Oid		   *oldmembers;
+		Oid		   *newmembers;
+		HeapTuple	tuple;
+
+		/* Search syscache for pg_publication */
+		tuple = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubId));
+		if (!HeapTupleIsValid(tuple))
+			elog(ERROR, "cache lookup failed for publication %u", pubId);
+
+		pg_publication_tuple = (Form_pg_publication) GETSTRUCT(tuple);
+
+		/*
+		 * Get owner ID and working copy of existing ACL. If there's no ACL,
+		 * substitute the proper default.
+		 */
+		ownerId = pg_publication_tuple->pubowner;
+		aclDatum = heap_getattr(tuple, Anum_pg_publication_pubacl,
+								RelationGetDescr(relation), &isNull);
+		if (isNull)
+		{
+			old_acl = acldefault(ACL_OBJECT_PUBLICATION, ownerId);
+			/* There are no old member roles according to the catalogs */
+			noldmembers = 0;
+			oldmembers = NULL;
+		}
+		else
+		{
+			old_acl = DatumGetAclPCopy(aclDatum);
+			/* Get the roles mentioned in the existing ACL */
+			noldmembers = aclmembers(old_acl, &oldmembers);
+		}
+
+		/* Determine ID to do the grant as, and available grant options */
+		select_best_grantor(GetUserId(), istmt->privileges,
+							old_acl, ownerId,
+							&grantorId, &avail_goptions);
+
+		/*
+		 * Restrict the privileges to what we can actually grant, and emit the
+		 * standards-mandated warning and error messages.
+		 */
+		this_privileges =
+			restrict_and_check_grant(istmt->is_grant, avail_goptions,
+									 istmt->all_privs, istmt->privileges,
+									 pubId, grantorId, ACL_KIND_PUBLICATION,
+									 NameStr(pg_publication_tuple->pubname),
+									 0, NULL);
+
+		/*
+		 * Generate new ACL.
+		 */
+		new_acl = merge_acl_with_grant(old_acl, istmt->is_grant,
+									   istmt->grant_option, istmt->behavior,
+									   istmt->grantees, this_privileges,
+									   grantorId, ownerId);
+
+		/*
+		 * We need the members of both old and new ACLs so we can correct the
+		 * shared dependency information.
+		 */
+		nnewmembers = aclmembers(new_acl, &newmembers);
+
+		/* finished building new ACL value, now insert it */
+		MemSet(values, 0, sizeof(values));
+		MemSet(nulls, false, sizeof(nulls));
+		MemSet(replaces, false, sizeof(replaces));
+
+		replaces[Anum_pg_publication_pubacl - 1] = true;
+		values[Anum_pg_publication_pubacl - 1] = PointerGetDatum(new_acl);
+
+		newtuple = heap_modify_tuple(tuple, RelationGetDescr(relation), values,
+									 nulls, replaces);
+
+		CatalogTupleUpdate(relation, &newtuple->t_self, newtuple);
+
+		/* Update the shared dependency ACL info */
+		updateAclDependencies(PublicationRelationId, pubId, 0,
+							  ownerId,
+							  noldmembers, oldmembers,
+							  nnewmembers, newmembers);
+
+		ReleaseSysCache(tuple);
+		pfree(new_acl);
+
+		/* prevent error when processing duplicate objects */
+		CommandCounterIncrement();
+	}
+
+	heap_close(relation, RowExclusiveLock);
+}
+
+static void
 ExecGrant_Tablespace(InternalGrant *istmt)
 {
 	Relation	relation;
@@ -4192,6 +4332,67 @@ pg_foreign_server_aclmask(Oid srv_oid, Oid roleid,
 }
 
 /*
+ * Exported routine for examining a user's privileges for a publication.
+ */
+AclMode
+pg_publication_aclmask(Oid pub_oid, Oid roleid,
+					   AclMode mask, AclMaskHow how)
+{
+	AclMode		result;
+	HeapTuple	tuple;
+	Datum		aclDatum;
+	bool		isNull;
+	Acl		   *acl;
+	Oid			ownerId;
+
+	Form_pg_publication pubForm;
+
+	/* Bypass permission checks for superusers */
+	if (superuser_arg(roleid))
+		return mask;
+
+	/*
+	 * Must get the publication's tuple from pg_publication
+	 */
+	tuple = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pub_oid));
+	if (!HeapTupleIsValid(tuple))
+		ereport(ERROR,
+				(errcode(ERRCODE_UNDEFINED_OBJECT),
+				 errmsg("publication with OID %u does not exist",
+						pub_oid)));
+	pubForm = (Form_pg_publication) GETSTRUCT(tuple);
+
+	/*
+	 * Normal case: get the publication's ACL from pg_publication
+	 */
+	ownerId = pubForm->pubowner;
+
+	aclDatum = SysCacheGetAttr(PUBLICATIONOID, tuple,
+							   Anum_pg_publication_pubacl, &isNull);
+	if (isNull)
+	{
+		/* No ACL, so build default ACL */
+		acl = acldefault(ACL_OBJECT_PUBLICATION, ownerId);
+		aclDatum = (Datum) 0;
+	}
+	else
+	{
+		/* detoast rel's ACL if necessary */
+		acl = DatumGetAclP(aclDatum);
+	}
+
+	result = aclmask(acl, roleid, ownerId, mask, how);
+
+	/* if we have a detoasted copy, free it */
+	if (acl && (Pointer) acl != DatumGetPointer(aclDatum))
+		pfree(acl);
+
+	ReleaseSysCache(tuple);
+
+	return result;
+}
+
+/*
  * Exported routine for examining a user's privileges for a type.
  */
 AclMode
@@ -4502,6 +4703,18 @@ pg_foreign_server_aclcheck(Oid srv_oid, Oid roleid, AclMode mode)
 }
 
 /*
+ * Exported routine for checking a user's access privileges to a publication
+ */
+AclResult
+pg_publication_aclcheck(Oid pub_oid, Oid roleid, AclMode mode)
+{
+	if (pg_publication_aclmask(pub_oid, roleid, mode, ACLMASK_ANY) != 0)
+		return ACLCHECK_OK;
+	else
+		return ACLCHECK_NO_PRIV;
+}
+
+/*
  * Exported routine for checking a user's access privileges to a type
  */
 AclResult
diff --git a/src/backend/commands/event_trigger.c b/src/backend/commands/event_trigger.c
index 346b347ae1..7438cfa59a 100644
--- a/src/backend/commands/event_trigger.c
+++ b/src/backend/commands/event_trigger.c
@@ -1199,6 +1199,7 @@ EventTriggerSupportsGrantObjectType(GrantObjectType objtype)
 		case ACL_OBJECT_LANGUAGE:
 		case ACL_OBJECT_LARGEOBJECT:
 		case ACL_OBJECT_NAMESPACE:
+		case ACL_OBJECT_PUBLICATION:
 		case ACL_OBJECT_TYPE:
 			return true;
 		default:
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index d69e39dc5b..6d1f392348 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -212,6 +212,8 @@ CreatePublication(CreatePublicationStmt *stmt)
 	values[Anum_pg_publication_pubdelete - 1] =
 		BoolGetDatum(publish_delete);
 
+	nulls[Anum_pg_publication_pubacl - 1] = true;
+
 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
 	/* Insert tuple into catalog. */
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 6316688a88..66b957bde4 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -6769,6 +6769,14 @@ privilege_target:
 					n->objs = $3;
 					$$ = n;
 				}
+			| PUBLICATION name_list
+				{
+					PrivTarget *n = (PrivTarget *) palloc(sizeof(PrivTarget));
+					n->targtype = ACL_TARGET_OBJECT;
+					n->objtype = ACL_OBJECT_PUBLICATION;
+					n->objs = $2;
+					$$ = n;
+				}
 			| SCHEMA name_list
 				{
 					PrivTarget *n = (PrivTarget *) palloc(sizeof(PrivTarget));
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 0ceb4be375..54fb8fa185 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -14,6 +14,8 @@
 
 #include "catalog/pg_publication.h"
 
+#include "miscadmin.h"
+
 #include "replication/logical.h"
 #include "replication/logicalproto.h"
 #include "replication/origin.h"
@@ -398,6 +400,12 @@ LoadPublications(List *pubnames)
 	{
 		char		   *pubname = (char *) lfirst(lc);
 		Publication	   *pub = GetPublicationByName(pubname, false);
+		AclResult		aclresult;
+
+		aclresult = pg_publication_aclcheck(pub->oid, GetUserId(), ACL_USAGE);
+		if (aclresult != ACLCHECK_OK)
+			aclcheck_error(aclresult, ACL_KIND_PUBLICATION,
+						   get_publication_name(pub->oid));
 
 		result = lappend(result, pub);
 	}
diff --git a/src/backend/utils/adt/acl.c b/src/backend/utils/adt/acl.c
index 35bdfc9a46..5298e3dfe3 100644
--- a/src/backend/utils/adt/acl.c
+++ b/src/backend/utils/adt/acl.c
@@ -106,6 +106,8 @@ static Oid	convert_function_name(text *functionname);
 static AclMode convert_function_priv_string(text *priv_type_text);
 static Oid	convert_language_name(text *languagename);
 static AclMode convert_language_priv_string(text *priv_type_text);
+static Oid	convert_publication_name(text *publicationname);
+static AclMode convert_publication_priv_string(text *priv_type_text);
 static Oid	convert_schema_name(text *schemaname);
 static AclMode convert_schema_priv_string(text *priv_type_text);
 static Oid	convert_server_name(text *servername);
@@ -791,6 +793,10 @@ acldefault(GrantObjectType objtype, Oid ownerId)
 			world_default = ACL_USAGE;
 			owner_default = ACL_ALL_RIGHTS_TYPE;
 			break;
+		case ACL_OBJECT_PUBLICATION:
+			world_default = ACL_NO_RIGHTS;
+			owner_default = ACL_ALL_RIGHTS_PUBLICATION;
+			break;
 		default:
 			elog(ERROR, "unrecognized objtype: %d", (int) objtype);
 			world_default = ACL_NO_RIGHTS;		/* keep compiler quiet */
@@ -883,6 +889,9 @@ acldefault_sql(PG_FUNCTION_ARGS)
 		case 'S':
 			objtype = ACL_OBJECT_FOREIGN_SERVER;
 			break;
+		case 'p':
+			objtype = ACL_OBJECT_PUBLICATION;
+			break;
 		case 'T':
 			objtype = ACL_OBJECT_TYPE;
 			break;
@@ -3621,6 +3630,197 @@ convert_language_priv_string(text *priv_type_text)
 
 
 /*
+ * has_publication_privilege variants
+ *		These are all named "has_publication_privilege" at the SQL level.
+ *		They take various combinations of publication name, publication OID,
+ *		user name, user OID, or implicit user = current_user.
+ *
+ *		The result is a boolean value: true if user has the indicated
+ *		privilege, false if not, or NULL if object doesn't exist.
+ */
+
+/*
+ * has_publication_privilege_name_name
+ *		Check user privileges on a publication given
+ *		name username, text publicationname, and text priv name.
+ */
+Datum
+has_publication_privilege_name_name(PG_FUNCTION_ARGS)
+{
+	Name		username = PG_GETARG_NAME(0);
+	text	   *publicationname = PG_GETARG_TEXT_P(1);
+	text	   *priv_type_text = PG_GETARG_TEXT_P(2);
+	Oid			roleid;
+	Oid			publicationoid;
+	AclMode		mode;
+	AclResult	aclresult;
+
+	roleid = get_role_oid_or_public(NameStr(*username));
+	publicationoid = convert_publication_name(publicationname);
+	mode = convert_publication_priv_string(priv_type_text);
+
+	aclresult = pg_publication_aclcheck(publicationoid, roleid, mode);
+
+	PG_RETURN_BOOL(aclresult == ACLCHECK_OK);
+}
+
+/*
+ * has_publication_privilege_name
+ *		Check user privileges on a publication given
+ *		text publicationname and text priv name.
+ *		current_user is assumed
+ */
+Datum
+has_publication_privilege_name(PG_FUNCTION_ARGS)
+{
+	text	   *publicationname = PG_GETARG_TEXT_P(0);
+	text	   *priv_type_text = PG_GETARG_TEXT_P(1);
+	Oid			roleid;
+	Oid			publicationoid;
+	AclMode		mode;
+	AclResult	aclresult;
+
+	roleid = GetUserId();
+	publicationoid = convert_publication_name(publicationname);
+	mode = convert_publication_priv_string(priv_type_text);
+
+	aclresult = pg_publication_aclcheck(publicationoid, roleid, mode);
+
+	PG_RETURN_BOOL(aclresult == ACLCHECK_OK);
+}
+
+/*
+ * has_publication_privilege_name_id
+ *		Check user privileges on a publication given
+ *		name usename, publication oid, and text priv name.
+ */
+Datum
+has_publication_privilege_name_id(PG_FUNCTION_ARGS)
+{
+	Name		username = PG_GETARG_NAME(0);
+	Oid			publicationoid = PG_GETARG_OID(1);
+	text	   *priv_type_text = PG_GETARG_TEXT_P(2);
+	Oid			roleid;
+	AclMode		mode;
+	AclResult	aclresult;
+
+	roleid = get_role_oid_or_public(NameStr(*username));
+	mode = convert_publication_priv_string(priv_type_text);
+
+	if (!SearchSysCacheExists1(PUBLICATIONOID, ObjectIdGetDatum(publicationoid)))
+		PG_RETURN_NULL();
+
+	aclresult = pg_publication_aclcheck(publicationoid, roleid, mode);
+
+	PG_RETURN_BOOL(aclresult == ACLCHECK_OK);
+}
+
+/*
+ * has_publication_privilege_id
+ *		Check user privileges on a publication given
+ *		publication oid, and text priv name.
+ *		current_user is assumed
+ */
+Datum
+has_publication_privilege_id(PG_FUNCTION_ARGS)
+{
+	Oid			publicationoid = PG_GETARG_OID(0);
+	text	   *priv_type_text = PG_GETARG_TEXT_P(1);
+	Oid			roleid;
+	AclMode		mode;
+	AclResult	aclresult;
+
+	roleid = GetUserId();
+	mode = convert_publication_priv_string(priv_type_text);
+
+	if (!SearchSysCacheExists1(PUBLICATIONOID, ObjectIdGetDatum(publicationoid)))
+		PG_RETURN_NULL();
+
+	aclresult = pg_publication_aclcheck(publicationoid, roleid, mode);
+
+	PG_RETURN_BOOL(aclresult == ACLCHECK_OK);
+}
+
+/*
+ * has_publication_privilege_id_name
+ *		Check user privileges on a publication given
+ *		roleid, text publicationname, and text priv name.
+ */
+Datum
+has_publication_privilege_id_name(PG_FUNCTION_ARGS)
+{
+	Oid			roleid = PG_GETARG_OID(0);
+	text	   *publicationname = PG_GETARG_TEXT_P(1);
+	text	   *priv_type_text = PG_GETARG_TEXT_P(2);
+	Oid			publicationoid;
+	AclMode		mode;
+	AclResult	aclresult;
+
+	publicationoid = convert_publication_name(publicationname);
+	mode = convert_publication_priv_string(priv_type_text);
+
+	aclresult = pg_publication_aclcheck(publicationoid, roleid, mode);
+
+	PG_RETURN_BOOL(aclresult == ACLCHECK_OK);
+}
+
+/*
+ * has_publication_privilege_id_id
+ *		Check user privileges on a publication given
+ *		roleid, publication oid, and text priv name.
+ */
+Datum
+has_publication_privilege_id_id(PG_FUNCTION_ARGS)
+{
+	Oid			roleid = PG_GETARG_OID(0);
+	Oid			publicationoid = PG_GETARG_OID(1);
+	text	   *priv_type_text = PG_GETARG_TEXT_P(2);
+	AclMode		mode;
+	AclResult	aclresult;
+
+	mode = convert_publication_priv_string(priv_type_text);
+
+	if (!SearchSysCacheExists1(PUBLICATIONOID, ObjectIdGetDatum(publicationoid)))
+		PG_RETURN_NULL();
+
+	aclresult = pg_publication_aclcheck(publicationoid, roleid, mode);
+
+	PG_RETURN_BOOL(aclresult == ACLCHECK_OK);
+}
+
+/*
+ *		Support routines for has_publication_privilege family.
+ */
+
+/*
+ * Given a publication name expressed as a string, look it up and return Oid
+ */
+static Oid
+convert_publication_name(text *publicationname)
+{
+	char	   *pubname = text_to_cstring(publicationname);
+
+	return get_publication_oid(pubname, false);
+}
+
+/*
+ * convert_publication_priv_string
+ *		Convert text string to AclMode value.
+ */
+static AclMode
+convert_publication_priv_string(text *priv_type_text)
+{
+	static const priv_map publication_priv_map[] = {
+		{"USAGE", ACL_USAGE},
+		{"USAGE WITH GRANT OPTION", ACL_GRANT_OPTION_FOR(ACL_USAGE)},
+		{NULL, 0}
+	};
+
+	return convert_any_priv_string(priv_type_text, publication_priv_map);
+}
+
+
+/*
  * has_schema_privilege variants
  *		These are all named "has_schema_privilege" at the SQL level.
  *		They take various combinations of schema name, schema OID,
diff --git a/src/bin/pg_dump/dumputils.c b/src/bin/pg_dump/dumputils.c
index b41f2b9125..2e49d5d12a 100644
--- a/src/bin/pg_dump/dumputils.c
+++ b/src/bin/pg_dump/dumputils.c
@@ -520,6 +520,8 @@ do { \
 		CONVERT_PRIV('X', "EXECUTE");
 	else if (strcmp(type, "LANGUAGE") == 0)
 		CONVERT_PRIV('U', "USAGE");
+	else if (strcmp(type, "PUBLICATION") == 0)
+		CONVERT_PRIV('U', "USAGE");
 	else if (strcmp(type, "SCHEMA") == 0)
 	{
 		CONVERT_PRIV('C', "CREATE");
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index e67171dccb..7d31d46cbd 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -3349,7 +3349,11 @@ dumpPolicy(Archive *fout, PolicyInfo *polinfo)
 void
 getPublications(Archive *fout)
 {
-	PQExpBuffer query;
+	PQExpBuffer acl_subquery = createPQExpBuffer();
+	PQExpBuffer racl_subquery = createPQExpBuffer();
+	PQExpBuffer initacl_subquery = createPQExpBuffer();
+	PQExpBuffer initracl_subquery = createPQExpBuffer();
+	PQExpBuffer query = createPQExpBuffer();
 	PGresult   *res;
 	PublicationInfo *pubinfo;
 	int			i_tableoid;
@@ -3360,24 +3364,46 @@ getPublications(Archive *fout)
 	int			i_pubinsert;
 	int			i_pubupdate;
 	int			i_pubdelete;
+	int			i_pubacl;
+	int			i_rpubacl;
+	int			i_initpubacl;
+	int			i_initrpubacl;
 	int			i,
 				ntups;
 
+
 	if (fout->remoteVersion < 100000)
 		return;
 
-	query = createPQExpBuffer();
-
-	resetPQExpBuffer(query);
+	buildACLQueries(acl_subquery, racl_subquery, initacl_subquery,
+					initracl_subquery, "p.pubacl", "p.pubowner", "'p'",
+					fout->dopt->binary_upgrade);
 
 	/* Get the publications. */
 	appendPQExpBuffer(query,
 					  "SELECT p.tableoid, p.oid, p.pubname, "
+					  "%s AS pubacl, "
+					  "%s AS rpubacl, "
+					  "%s AS initpubacl, "
+					  "%s AS initrpubacl, "
 					  "(%s p.pubowner) AS rolname, "
 					  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete "
-					  "FROM pg_catalog.pg_publication p",
+					  "FROM pg_catalog.pg_publication p "
+					  "LEFT JOIN pg_init_privs pip ON "
+					  "(p.oid = pip.objoid "
+					  "AND pip.classoid = 'pg_publication'::regclass "
+					  "AND pip.objsubid = 0) ",
+					  acl_subquery->data,
+					  racl_subquery->data,
+					  initacl_subquery->data,
+					  initracl_subquery->data,
 					  username_subquery);
 
+	destroyPQExpBuffer(acl_subquery);
+	destroyPQExpBuffer(racl_subquery);
+	destroyPQExpBuffer(initacl_subquery);
+	destroyPQExpBuffer(initracl_subquery);
+
 	res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
 
 	ntups = PQntuples(res);
@@ -3390,6 +3416,10 @@ getPublications(Archive *fout)
 	i_pubinsert = PQfnumber(res, "pubinsert");
 	i_pubupdate = PQfnumber(res, "pubupdate");
 	i_pubdelete = PQfnumber(res, "pubdelete");
+	i_pubacl = PQfnumber(res, "pubacl");
+	i_rpubacl = PQfnumber(res, "rpubacl");
+	i_initpubacl = PQfnumber(res, "initpubacl");
+	i_initrpubacl = PQfnumber(res, "initrpubacl");
 
 	pubinfo = pg_malloc(ntups * sizeof(PublicationInfo));
 
@@ -3410,6 +3440,10 @@ getPublications(Archive *fout)
 			(strcmp(PQgetvalue(res, i, i_pubupdate), "t") == 0);
 		pubinfo[i].pubdelete =
 			(strcmp(PQgetvalue(res, i, i_pubdelete), "t") == 0);
+		pubinfo[i].pubacl = pg_strdup(PQgetvalue(res, i, i_pubacl));
+		pubinfo[i].rpubacl = pg_strdup(PQgetvalue(res, i, i_rpubacl));
+		pubinfo[i].initpubacl = pg_strdup(PQgetvalue(res, i, i_initpubacl));
+		pubinfo[i].initrpubacl = pg_strdup(PQgetvalue(res, i, i_initrpubacl));
 
 		if (strlen(pubinfo[i].rolname) == 0)
 			write_msg(NULL, "WARNING: owner of publication \"%s\" appears to be invalid\n",
@@ -3430,6 +3464,7 @@ dumpPublication(Archive *fout, PublicationInfo *pubinfo)
 	DumpOptions *dopt = fout->dopt;
 	PQExpBuffer delq;
 	PQExpBuffer query;
+	char	   *qpubname;
 
 	if (dopt->dataOnly)
 		return;
@@ -3437,11 +3472,11 @@ dumpPublication(Archive *fout, PublicationInfo *pubinfo)
 	delq = createPQExpBuffer();
 	query = createPQExpBuffer();
 
-	appendPQExpBuffer(delq, "DROP PUBLICATION %s;\n",
-					  fmtId(pubinfo->dobj.name));
+	qpubname = pg_strdup(fmtId(pubinfo->dobj.name));
 
-	appendPQExpBuffer(query, "CREATE PUBLICATION %s",
-					  fmtId(pubinfo->dobj.name));
+	appendPQExpBuffer(delq, "DROP PUBLICATION %s;\n", qpubname);
+
+	appendPQExpBuffer(query, "CREATE PUBLICATION %s", qpubname);
 
 	if (pubinfo->puballtables)
 		appendPQExpBufferStr(query, " FOR ALL TABLES");
@@ -3474,6 +3509,13 @@ dumpPublication(Archive *fout, PublicationInfo *pubinfo)
 				 NULL, 0,
 				 NULL, NULL);
 
+	if (pubinfo->dobj.dump & DUMP_COMPONENT_ACL)
+		dumpACL(fout, pubinfo->dobj.catId, pubinfo->dobj.dumpId, "PUBLICATION",
+				qpubname, NULL, pubinfo->dobj.name,
+				NULL, pubinfo->rolname, pubinfo->pubacl, pubinfo->rpubacl,
+				pubinfo->initpubacl, pubinfo->initrpubacl);
+
+	free(qpubname);
 	destroyPQExpBuffer(delq);
 	destroyPQExpBuffer(query);
 }
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index a466527ec6..533ac24ebf 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -583,6 +583,10 @@ typedef struct _PublicationInfo
 	bool		pubinsert;
 	bool		pubupdate;
 	bool		pubdelete;
+	char	   *pubacl;
+	char	   *rpubacl;
+	char	   *initpubacl;
+	char	   *initrpubacl;
 } PublicationInfo;
 
 /*
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 61a3e2a848..6c697ce448 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -5003,7 +5003,9 @@ describePublications(const char *pattern)
 
 	printfPQExpBuffer(&buf,
 					  "SELECT oid, pubname, puballtables, pubinsert,\n"
-					  "  pubupdate, pubdelete\n"
+					  "  pubupdate, pubdelete, ");
+	printACLColumn(&buf, "pubacl");
+	appendPQExpBuffer(&buf,
 					  "FROM pg_catalog.pg_publication\n");
 
 	processSQLNamePattern(pset.db, &buf, pattern, false, false,
@@ -5022,7 +5024,7 @@ describePublications(const char *pattern)
 	for (i = 0; i < PQntuples(res); i++)
 	{
 		const char	align = 'l';
-		int			ncols = 3;
+		int			ncols = 4;
 		int			nrows = 1;
 		int			tables = 0;
 		PGresult   *tabres;
@@ -5041,10 +5043,12 @@ describePublications(const char *pattern)
 		printTableAddHeader(&cont, gettext_noop("Inserts"), true, align);
 		printTableAddHeader(&cont, gettext_noop("Updates"), true, align);
 		printTableAddHeader(&cont, gettext_noop("Deletes"), true, align);
+		printTableAddHeader(&cont, gettext_noop("Access privileges"), true, align);
 
 		printTableAddCell(&cont, PQgetvalue(res, i, 3), false, false);
 		printTableAddCell(&cont, PQgetvalue(res, i, 4), false, false);
 		printTableAddCell(&cont, PQgetvalue(res, i, 5), false, false);
+		printTableAddCell(&cont, PQgetvalue(res, i, 6), false, false);
 
 		if (puballtables)
 			printfPQExpBuffer(&buf,
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index e8458e939e..84633559c0 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -2800,6 +2800,7 @@ psql_completion(const char *text, int start, int end)
 								   " UNION SELECT 'FUNCTION'"
 								   " UNION SELECT 'LANGUAGE'"
 								   " UNION SELECT 'LARGE OBJECT'"
+								   " UNION SELECT 'PUBLICATION'"
 								   " UNION SELECT 'SCHEMA'"
 								   " UNION SELECT 'SEQUENCE'"
 								   " UNION SELECT 'TABLE'"
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index b24e3953a1..eb2cb8d862 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -53,6 +53,6 @@
  */
 
 /*							yyyymmddN */
-#define CATALOG_VERSION_NO	201703151
+#define CATALOG_VERSION_NO	201703155
 
 #endif
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 3d5d866071..990ce2bf33 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -3611,6 +3611,19 @@ DESCR("current user privilege on language by language name");
 DATA(insert OID = 2267 (  has_language_privilege		   PGNSP PGUID 12 1 0 0 0 f f f f t f s s 2 0 16 "26 25" _null_ _null_ _null_ _null_ _null_ has_language_privilege_id _null_ _null_ _null_ ));
 DESCR("current user privilege on language by language oid");
 
+DATA(insert OID = 4001 (  has_publication_privilege		   PGNSP PGUID 12 1 0 0 0 f f f f t f s s 3 0 16 "19 25 25" _null_ _null_ _null_ _null_ _null_	has_publication_privilege_name_name _null_ _null_ _null_ ));
+DESCR("user privilege on publication by username, publication name");
+DATA(insert OID = 4002 (  has_publication_privilege		   PGNSP PGUID 12 1 0 0 0 f f f f t f s s 3 0 16 "19 26 25" _null_ _null_ _null_ _null_ _null_	has_publication_privilege_name_id _null_ _null_ _null_ ));
+DESCR("user privilege on publication by username, publication oid");
+DATA(insert OID = 4003 (  has_publication_privilege		   PGNSP PGUID 12 1 0 0 0 f f f f t f s s 3 0 16 "26 25 25" _null_ _null_ _null_ _null_ _null_	has_publication_privilege_id_name _null_ _null_ _null_ ));
+DESCR("user privilege on publication by user oid, publication name");
+DATA(insert OID = 4004 (  has_publication_privilege		   PGNSP PGUID 12 1 0 0 0 f f f f t f s s 3 0 16 "26 26 25" _null_ _null_ _null_ _null_ _null_	has_publication_privilege_id_id _null_ _null_ _null_ ));
+DESCR("user privilege on publication by user oid, publication oid");
+DATA(insert OID = 4005 (  has_publication_privilege		   PGNSP PGUID 12 1 0 0 0 f f f f t f s s 2 0 16 "25 25" _null_ _null_ _null_ _null_ _null_ has_publication_privilege_name _null_ _null_ _null_ ));
+DESCR("current user privilege on publication by publication name");
+DATA(insert OID = 4006 (  has_publication_privilege		   PGNSP PGUID 12 1 0 0 0 f f f f t f s s 2 0 16 "26 25" _null_ _null_ _null_ _null_ _null_ has_publication_privilege_id _null_ _null_ _null_ ));
+DESCR("current user privilege on publication by publication oid");
+
 DATA(insert OID = 2268 (  has_schema_privilege		   PGNSP PGUID 12 1 0 0 0 f f f f t f s s 3 0 16 "19 25 25" _null_ _null_ _null_ _null_ _null_	has_schema_privilege_name_name _null_ _null_ _null_ ));
 DESCR("user privilege on schema by username, schema name");
 DATA(insert OID = 2269 (  has_schema_privilege		   PGNSP PGUID 12 1 0 0 0 f f f f t f s s 3 0 16 "19 26 25" _null_ _null_ _null_ _null_ _null_	has_schema_privilege_name_id _null_ _null_ _null_ ));
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index f3c4f3932b..bce81c373f 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -49,6 +49,9 @@ CATALOG(pg_publication,6104)
 	/* true if deletes are published */
 	bool		pubdelete;
 
+#ifdef CATALOG_VARLEN			/* variable-length fields start here */
+	aclitem		pubacl[1];		/* access permissions */
+#endif
 } FormData_pg_publication;
 
 /* ----------------
@@ -63,13 +66,14 @@ typedef FormData_pg_publication *Form_pg_publication;
  * ----------------
  */
 
-#define Natts_pg_publication				6
+#define Natts_pg_publication				7
 #define Anum_pg_publication_pubname			1
 #define Anum_pg_publication_pubowner		2
 #define Anum_pg_publication_puballtables	3
 #define Anum_pg_publication_pubinsert		4
 #define Anum_pg_publication_pubupdate		5
 #define Anum_pg_publication_pubdelete		6
+#define Anum_pg_publication_pubacl			7
 
 typedef struct PublicationActions
 {
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index d576523f6a..276489e556 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -1782,6 +1782,7 @@ typedef enum GrantObjectType
 	ACL_OBJECT_LANGUAGE,		/* procedural language */
 	ACL_OBJECT_LARGEOBJECT,		/* largeobject */
 	ACL_OBJECT_NAMESPACE,		/* namespace */
+	ACL_OBJECT_PUBLICATION,		/* publication */
 	ACL_OBJECT_TABLESPACE,		/* tablespace */
 	ACL_OBJECT_TYPE				/* type */
 } GrantObjectType;
diff --git a/src/include/utils/acl.h b/src/include/utils/acl.h
index 0d118525c9..5a5836fd4a 100644
--- a/src/include/utils/acl.h
+++ b/src/include/utils/acl.h
@@ -158,6 +158,7 @@ typedef ArrayType Acl;
 #define ACL_ALL_RIGHTS_NAMESPACE	(ACL_USAGE|ACL_CREATE)
 #define ACL_ALL_RIGHTS_TABLESPACE	(ACL_CREATE)
 #define ACL_ALL_RIGHTS_TYPE			(ACL_USAGE)
+#define ACL_ALL_RIGHTS_PUBLICATION	(ACL_USAGE)
 
 /* operation codes for pg_*_aclmask */
 typedef enum
@@ -273,6 +274,8 @@ extern AclMode pg_foreign_data_wrapper_aclmask(Oid fdw_oid, Oid roleid,
 								AclMode mask, AclMaskHow how);
 extern AclMode pg_foreign_server_aclmask(Oid srv_oid, Oid roleid,
 						  AclMode mask, AclMaskHow how);
+extern AclMode pg_publication_aclmask(Oid pub_oid, Oid roleid,
+				AclMode mask, AclMaskHow how);
 extern AclMode pg_type_aclmask(Oid type_oid, Oid roleid,
 				AclMode mask, AclMaskHow how);
 
@@ -290,6 +293,7 @@ extern AclResult pg_namespace_aclcheck(Oid nsp_oid, Oid roleid, AclMode mode);
 extern AclResult pg_tablespace_aclcheck(Oid spc_oid, Oid roleid, AclMode mode);
 extern AclResult pg_foreign_data_wrapper_aclcheck(Oid fdw_oid, Oid roleid, AclMode mode);
 extern AclResult pg_foreign_server_aclcheck(Oid srv_oid, Oid roleid, AclMode mode);
+extern AclResult pg_publication_aclcheck(Oid pub_oid, Oid roleid, AclMode mode);
 extern AclResult pg_type_aclcheck(Oid type_oid, Oid roleid, AclMode mode);
 
 extern void aclcheck_error(AclResult aclerr, AclObjectKind objectkind,
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index 5a7c0edf7d..f72bb1b2d3 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -76,10 +76,10 @@ ERROR:  relation "testpub_tbl1" is already member of publication "testpub_fortbl
 CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1;
 ERROR:  publication "testpub_fortbl" already exists
 \dRp+ testpub_fortbl
- Publication testpub_fortbl
- Inserts | Updates | Deletes 
----------+---------+---------
- t       | t       | t
+           Publication testpub_fortbl
+ Inserts | Updates | Deletes | Access privileges 
+---------+---------+---------+-------------------
+ t       | t       | t       | 
 Tables:
     "pub_test.testpub_nopk"
     "public.testpub_tbl1"
@@ -117,10 +117,10 @@ Publications:
     "testpub_fortbl"
 
 \dRp+ testpub_default
- Publication testpub_default
- Inserts | Updates | Deletes 
----------+---------+---------
- t       | t       | t
+           Publication testpub_default
+ Inserts | Updates | Deletes | Access privileges 
+---------+---------+---------+-------------------
+ t       | t       | t       | 
 Tables:
     "pub_test.testpub_nopk"
     "public.testpub_tbl1"
@@ -161,10 +161,10 @@ REVOKE CREATE ON DATABASE regression FROM regress_publication_user2;
 DROP VIEW testpub_view;
 DROP TABLE testpub_tbl1;
 \dRp+ testpub_default
- Publication testpub_default
- Inserts | Updates | Deletes 
----------+---------+---------
- t       | t       | t
+           Publication testpub_default
+ Inserts | Updates | Deletes | Access privileges 
+---------+---------+---------+-------------------
+ t       | t       | t       | 
 (1 row)
 
 -- fail - must be owner of publication
@@ -190,6 +190,15 @@ ALTER PUBLICATION testpub_default OWNER TO regress_publication_user2;
  testpub_default | regress_publication_user2 | t       | t       | t
 (1 row)
 
+GRANT USAGE ON PUBLICATION testpub_default TO regress_publication_user;
+\dRp+ testpub_default
+                             Publication testpub_default
+ Inserts | Updates | Deletes |                   Access privileges                   
+---------+---------+---------+-------------------------------------------------------
+ t       | t       | t       | regress_publication_user2=U/regress_publication_user2+
+         |         |         | regress_publication_user=U/regress_publication_user2
+(1 row)
+
 DROP PUBLICATION testpub_default;
 DROP PUBLICATION testpib_ins_trunct;
 DROP PUBLICATION testpub_fortbl;
diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql
index cff9931a77..f237c3faa9 100644
--- a/src/test/regress/sql/publication.sql
+++ b/src/test/regress/sql/publication.sql
@@ -112,6 +112,10 @@ CREATE PUBLICATION testpub2;  -- ok
 
 \dRp testpub_default
 
+GRANT USAGE ON PUBLICATION testpub_default TO regress_publication_user;
+
+\dRp+ testpub_default
+
 DROP PUBLICATION testpub_default;
 DROP PUBLICATION testpib_ins_trunct;
 DROP PUBLICATION testpub_fortbl;
diff --git a/src/test/subscription/t/003_privileges.pl b/src/test/subscription/t/003_privileges.pl
new file mode 100644
index 0000000000..86fd866e28
--- /dev/null
+++ b/src/test/subscription/t/003_privileges.pl
@@ -0,0 +1,72 @@
+# Tests of privileges for logical replication
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 2;
+
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+$node_publisher->safe_psql('postgres', qq(
+CREATE USER test_user1;
+CREATE USER test_user2 REPLICATION;
+GRANT CREATE ON DATABASE postgres TO test_user1;
+
+SET ROLE test_user1;
+CREATE TABLE test1 (a int PRIMARY KEY, b text);
+CREATE PUBLICATION mypub1 FOR TABLE test1;
+));
+
+my $appname = 'tap_sub';
+
+$node_subscriber->safe_psql('postgres', qq(
+CREATE TABLE test1 (a int PRIMARY KEY, b text);
+CREATE SUBSCRIPTION mysub1 CONNECTION '$publisher_connstr user=test_user2 application_name=$appname' PUBLICATION mypub1;
+));
+
+$node_publisher->safe_psql('postgres', qq(
+SET ROLE test_user1;
+INSERT INTO test1 VALUES (1, 'one');
+));
+
+my $log = TestLib::slurp_file($node_publisher->logfile);
+like($log, qr/permission denied for publication mypub1/, "permission denied on publication");
+
+$node_publisher->safe_psql('postgres', qq(
+SET ROLE test_user1;
+GRANT USAGE ON PUBLICATION mypub1 TO test_user2;
+));
+
+# drop and recreate subscription so it sees the newly granted
+# privileges
+$node_subscriber->safe_psql('postgres', qq(
+DROP SUBSCRIPTION mysub1;
+CREATE SUBSCRIPTION mysub1 CONNECTION '$publisher_connstr user=test_user2 application_name=$appname' PUBLICATION mypub1;
+));
+
+$node_publisher->safe_psql('postgres', qq(
+SET ROLE test_user1;
+INSERT INTO test1 VALUES (2, 'two');
+));
+
+my $caughtup_query =
+	"SELECT pg_current_wal_location() <= replay_location FROM pg_stat_replication WHERE application_name = '$appname';";
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+my $result = $node_subscriber->safe_psql('postgres', qq(
+SELECT a, b FROM test1;
+));
+
+is($result, '2|two', 'replication catches up after privileges granted');
+
+$node_subscriber->stop;
+$node_publisher->stop;
-- 
2.12.0

From 44e7b9e5ba4449e10c0cd788534ac38f222fc68f Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <peter_e@gmx.net>
Date: Mon, 13 Feb 2017 16:28:36 -0500
Subject: [PATCH v2 4/5] Add subscription apply worker privilege checks

The subscription apply worker now checks INSERT/UPDATE/DELETE privileges
on a table before writing to it.  This was previously not checked, but
was not necessary since a subscription owner had to be a superuser.  But
we want to allow other users to own subscriptions.
---
 doc/src/sgml/logical-replication.sgml      |  4 +++-
 src/backend/replication/logical/relation.c | 24 ++++++++++++++++++++++++
 src/backend/replication/logical/worker.c   |  3 +++
 src/include/replication/logicalrelation.h  |  5 +++++
 4 files changed, 35 insertions(+), 1 deletion(-)

diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 4c8d454c9e..89b4335d00 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -322,7 +322,9 @@ <title>Security</title>
 
   <para>
    The subscription apply process will run in the local database with the
-   privileges of a superuser.
+   privileges of the subscription owner.  The subscription owner, if it is not
+   a superuser, therefore needs to have appropriate privileges on the tables
+   the subscription is operating on.
   </para>
 
   <para>
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index d8dc0c7194..4faee8323c 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -19,6 +19,7 @@
 #include "access/heapam.h"
 #include "access/sysattr.h"
 #include "catalog/namespace.h"
+#include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "replication/logicalrelation.h"
 #include "replication/worker_internal.h"
@@ -78,6 +79,18 @@ logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid)
 }
 
 /*
+ * Syscache invalidation callback for our relation map cache.
+ *
+ * Same as the relcache callback, except that we just invalidate all cache
+ * entries all the time.
+ */
+static void
+logicalrep_relmap_syscache_invalidate_cb(Datum arg, int cacheid, uint32 hashvalue)
+{
+	logicalrep_relmap_invalidate_cb(arg, InvalidOid);
+}
+
+/*
  * Initialize the relation map cache.
  */
 static void
@@ -113,6 +126,8 @@ logicalrep_relmap_init()
 	/* Watch for invalidation events. */
 	CacheRegisterRelcacheCallback(logicalrep_relmap_invalidate_cb,
 								  (Datum) 0);
+	CacheRegisterSyscacheCallback(AUTHOID, logicalrep_relmap_syscache_invalidate_cb,
+								  (Datum) 0);
 	CacheRegisterSyscacheCallback(TYPEOID, logicalrep_typmap_invalidate_cb,
 								  (Datum) 0);
 }
@@ -277,6 +292,15 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
 							remoterel->nspname, remoterel->relname)));
 
 		/*
+		 * Cache ACL results.  If they are changed while the worker is active,
+		 * the relcache and syscache invalidation will ensure that we get here
+		 * again to recompute this.
+		 */
+		entry->insert_aclresult = pg_class_aclcheck(relid, GetUserId(), ACL_INSERT);
+		entry->update_aclresult = pg_class_aclcheck(relid, GetUserId(), ACL_UPDATE);
+		entry->delete_aclresult = pg_class_aclcheck(relid, GetUserId(), ACL_DELETE);
+
+		/*
 		 * Build the mapping of local attribute numbers to remote attribute
 		 * numbers and validate that we don't miss any replicated columns
 		 * as that would result in potentially unwanted data loss.
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index c3e54af259..7534702df8 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -515,6 +515,7 @@ apply_handle_insert(StringInfo s)
 
 	relid = logicalrep_read_insert(s, &newtup);
 	rel = logicalrep_rel_open(relid, RowExclusiveLock);
+	aclcheck_error(rel->insert_aclresult, ACL_KIND_CLASS, get_rel_name(rel->localreloid));
 
 	/* Initialize the executor state. */
 	estate = create_estate_for_relation(rel);
@@ -607,6 +608,7 @@ apply_handle_update(StringInfo s)
 	relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
 								   &newtup);
 	rel = logicalrep_rel_open(relid, RowExclusiveLock);
+	aclcheck_error(rel->update_aclresult, ACL_KIND_CLASS, get_rel_name(rel->localreloid));
 
 	/* Check if we can do the update. */
 	check_relation_updatable(rel);
@@ -716,6 +718,7 @@ apply_handle_delete(StringInfo s)
 
 	relid = logicalrep_read_delete(s, &oldtup);
 	rel = logicalrep_rel_open(relid, RowExclusiveLock);
+	aclcheck_error(rel->delete_aclresult, ACL_KIND_CLASS, get_rel_name(rel->localreloid));
 
 	/* Check if we can do the delete. */
 	check_relation_updatable(rel);
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index 7fb7fbfb4d..f9d5144f61 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -25,6 +25,11 @@ typedef struct LogicalRepRelMapEntry
 										 * remote ones */
 	bool				updatable;		/* Can apply updates/deletes? */
 
+	/* Cache of ACL results */
+	AclResult			insert_aclresult;
+	AclResult			update_aclresult;
+	AclResult			delete_aclresult;
+
 	/* Sync state. */
 	char				state;
 	XLogRecPtr			statelsn;
-- 
2.12.0

From 3b3a5e1d6d3d24b063151975eb495cfdbcfc71a2 Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <peter_e@gmx.net>
Date: Mon, 13 Feb 2017 15:42:29 -0500
Subject: [PATCH v2 5/5] Add CREATE SUBSCRIPTION privilege on databases

This new privilege allows the creation of subscriptions.  This was
previously only allowed for superusers.
---
 doc/src/sgml/logical-replication.sgml      |  3 ++-
 doc/src/sgml/ref/create_subscription.sgml  |  6 +++++
 doc/src/sgml/ref/grant.sgml                | 12 +++++++++-
 src/backend/catalog/aclchk.c               |  4 ++++
 src/backend/commands/subscriptioncmds.c    | 38 +++++++++++++++++++-----------
 src/backend/parser/gram.y                  |  6 +++++
 src/backend/utils/adt/acl.c                |  9 +++++++
 src/bin/pg_dump/dumputils.c                |  2 ++
 src/include/nodes/parsenodes.h             |  3 ++-
 src/include/utils/acl.h                    |  5 ++--
 src/test/regress/expected/subscription.out | 14 ++++++++++-
 src/test/regress/sql/subscription.sql      | 14 ++++++++++-
 12 files changed, 95 insertions(+), 21 deletions(-)

diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 89b4335d00..e49f0ef05b 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -312,7 +312,8 @@ <title>Security</title>
   </para>
 
   <para>
-   To create a subscription, the user must be a superuser.
+   To create a subscription, the user must have the <literal>CREATE
+   SUBSCRIPTION</literal> privilege in the database.
   </para>
 
   <para>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 9bed26219c..94dec39228 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -136,6 +136,12 @@ <title>Parameters</title>
   <title>Notes</title>
 
   <para>
+   To create a subscription, the invoking user must have the
+   <literal>CREATE SUBSCRIPTION</> privilege for the current database.  (Of
+   course, superusers bypass this check.)
+  </para>
+
+  <para>
    See <xref linkend="streaming-replication-authentication"> for details on
    how to configure access control between the subscription and the
    publication instance.
diff --git a/doc/src/sgml/ref/grant.sgml b/doc/src/sgml/ref/grant.sgml
index e1336e6857..9ca0eadb5c 100644
--- a/doc/src/sgml/ref/grant.sgml
+++ b/doc/src/sgml/ref/grant.sgml
@@ -38,7 +38,7 @@
          | ALL SEQUENCES IN SCHEMA <replaceable class="PARAMETER">schema_name</replaceable> [, ...] }
     TO <replaceable class="PARAMETER">role_specification</replaceable> [, ...] [ WITH GRANT OPTION ]
 
-GRANT { { CREATE | CONNECT | TEMPORARY | TEMP } [, ...] | ALL [ PRIVILEGES ] }
+GRANT { { CREATE | CONNECT | TEMPORARY | TEMP | CREATE SUBSCRIPTION } [, ...] | ALL [ PRIVILEGES ] }
     ON DATABASE <replaceable>database_name</replaceable> [, ...]
     TO <replaceable class="PARAMETER">role_specification</replaceable> [, ...] [ WITH GRANT OPTION ]
 
@@ -300,6 +300,15 @@ <title>GRANT on Database Objects</title>
     </varlistentry>
 
     <varlistentry>
+     <term>CREATE SUBSCRIPTION</term>
+     <listitem>
+      <para>
+       For databases, allows new subscriptions to be created within the database.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry>
      <term>CONNECT</term>
      <listitem>
       <para>
@@ -542,6 +551,7 @@ <title>Notes</title>
             X -- EXECUTE
             U -- USAGE
             C -- CREATE
+            S -- CREATE SUBSCRIPTION
             c -- CONNECT
             T -- TEMPORARY
       arwdDxt -- ALL PRIVILEGES (for tables, varies for other objects)
diff --git a/src/backend/catalog/aclchk.c b/src/backend/catalog/aclchk.c
index e029485c8e..4ab6e0b784 100644
--- a/src/backend/catalog/aclchk.c
+++ b/src/backend/catalog/aclchk.c
@@ -3362,6 +3362,8 @@ string_to_privilege(const char *privname)
 		return ACL_CREATE_TEMP;
 	if (strcmp(privname, "connect") == 0)
 		return ACL_CONNECT;
+	if (strcmp(privname, "create subscription") == 0)
+		return ACL_CREATE_SUBSCRIPTION;
 	if (strcmp(privname, "rule") == 0)
 		return 0;				/* ignore old RULE privileges */
 	ereport(ERROR,
@@ -3399,6 +3401,8 @@ privilege_to_string(AclMode privilege)
 			return "TEMP";
 		case ACL_CONNECT:
 			return "CONNECT";
+		case ACL_CREATE_SUBSCRIPTION:
+			return "CREATE SUBSCRIPTION";
 		default:
 			elog(ERROR, "unrecognized privilege: %d", (int) privilege);
 	}
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 0198e6d75b..c20de68c75 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -26,6 +26,7 @@
 #include "catalog/pg_type.h"
 #include "catalog/pg_subscription.h"
 
+#include "commands/dbcommands.h"
 #include "commands/defrem.h"
 #include "commands/event_trigger.h"
 #include "commands/subscriptioncmds.h"
@@ -221,6 +222,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	char		originname[NAMEDATALEN];
 	bool		create_slot;
 	List	   *publications;
+	AclResult	aclresult;
 
 	/*
 	 * Parse and check options.
@@ -239,10 +241,11 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	if (create_slot)
 		PreventTransactionChain(isTopLevel, "CREATE SUBSCRIPTION ... CREATE SLOT");
 
-	if (!superuser())
-		ereport(ERROR,
-				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
-				 (errmsg("must be superuser to create subscriptions"))));
+	/* must have CREATE SUBSCRIPTION privilege on database */
+	aclresult = pg_database_aclcheck(MyDatabaseId, GetUserId(), ACL_CREATE_SUBSCRIPTION);
+	if (aclresult != ACLCHECK_OK)
+		aclcheck_error(aclresult, ACL_KIND_DATABASE,
+					   get_database_name(MyDatabaseId));
 
 	rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
 
@@ -609,17 +612,24 @@ AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
 	if (form->subowner == newOwnerId)
 		return;
 
-	if (!pg_subscription_ownercheck(HeapTupleGetOid(tup), GetUserId()))
-		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_SUBSCRIPTION,
-					   NameStr(form->subname));
+	if (!superuser())
+	{
+		AclResult	aclresult;
 
-	/* New owner must be a superuser */
-	if (!superuser_arg(newOwnerId))
-		ereport(ERROR,
-				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
-		  errmsg("permission denied to change owner of subscription \"%s\"",
-				 NameStr(form->subname)),
-			 errhint("The owner of an subscription must be a superuser.")));
+		/* Must be owner */
+		if (!pg_subscription_ownercheck(HeapTupleGetOid(tup), GetUserId()))
+			aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_SUBSCRIPTION,
+						   NameStr(form->subname));
+
+		/* Must be able to become new owner */
+		check_is_member_of_role(GetUserId(), newOwnerId);
+
+		/* New owner must have CREATE SUBSCRIPTION privilege on database */
+		aclresult = pg_database_aclcheck(MyDatabaseId, newOwnerId, ACL_CREATE_SUBSCRIPTION);
+		if (aclresult != ACLCHECK_OK)
+			aclcheck_error(aclresult, ACL_KIND_DATABASE,
+						   get_database_name(MyDatabaseId));
+	}
 
 	form->subowner = newOwnerId;
 	CatalogTupleUpdate(rel, &tup->t_self, tup);
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 66b957bde4..26f8d9bb0c 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -6682,6 +6682,12 @@ privilege:	SELECT opt_column_list
 				n->cols = $2;
 				$$ = n;
 			}
+		| CREATE ColId
+			{
+				AccessPriv *n = makeNode(AccessPriv);
+				n->priv_name = psprintf("create %s", $2);
+				$$ = n;
+			}
 		;
 
 
diff --git a/src/backend/utils/adt/acl.c b/src/backend/utils/adt/acl.c
index 5298e3dfe3..e0a3f6e6dd 100644
--- a/src/backend/utils/adt/acl.c
+++ b/src/backend/utils/adt/acl.c
@@ -316,6 +316,9 @@ aclparse(const char *s, AclItem *aip)
 			case ACL_CONNECT_CHR:
 				read = ACL_CONNECT;
 				break;
+			case ACL_CREATE_SUBSCRIPTION_CHR:
+				read = ACL_CREATE_SUBSCRIPTION;
+				break;
 			case 'R':			/* ignore old RULE privileges */
 				read = 0;
 				break;
@@ -1617,6 +1620,8 @@ convert_priv_string(text *priv_type_text)
 		return ACL_CREATE_TEMP;
 	if (pg_strcasecmp(priv_type, "CONNECT") == 0)
 		return ACL_CONNECT;
+	if (pg_strcasecmp(priv_type, "CREATE SUBSCRIPTION") == 0)
+		return ACL_CREATE_SUBSCRIPTION;
 	if (pg_strcasecmp(priv_type, "RULE") == 0)
 		return 0;				/* ignore old RULE privileges */
 
@@ -1713,6 +1718,8 @@ convert_aclright_to_string(int aclright)
 			return "TEMPORARY";
 		case ACL_CONNECT:
 			return "CONNECT";
+		case ACL_CREATE_SUBSCRIPTION:
+			return "CREATE SUBSCRIPTION";
 		default:
 			elog(ERROR, "unrecognized aclright: %d", aclright);
 			return NULL;
@@ -3048,6 +3055,8 @@ convert_database_priv_string(text *priv_type_text)
 		{"TEMP WITH GRANT OPTION", ACL_GRANT_OPTION_FOR(ACL_CREATE_TEMP)},
 		{"CONNECT", ACL_CONNECT},
 		{"CONNECT WITH GRANT OPTION", ACL_GRANT_OPTION_FOR(ACL_CONNECT)},
+		{"CREATE SUBSCRIPTION", ACL_CREATE_SUBSCRIPTION},
+		{"CREATE SUBSCRIPTION WITH GRANT OPTION", ACL_GRANT_OPTION_FOR(ACL_CREATE_SUBSCRIPTION)},
 		{NULL, 0}
 	};
 
diff --git a/src/bin/pg_dump/dumputils.c b/src/bin/pg_dump/dumputils.c
index 2e49d5d12a..c7f10b7bd1 100644
--- a/src/bin/pg_dump/dumputils.c
+++ b/src/bin/pg_dump/dumputils.c
@@ -532,6 +532,8 @@ do { \
 		CONVERT_PRIV('C', "CREATE");
 		CONVERT_PRIV('c', "CONNECT");
 		CONVERT_PRIV('T', "TEMPORARY");
+		if (remoteVersion >= 100000)
+			CONVERT_PRIV('S', "CREATE SUBSCRIPTION");
 	}
 	else if (strcmp(type, "TABLESPACE") == 0)
 		CONVERT_PRIV('C', "CREATE");
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 276489e556..4ec77e53c2 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -75,7 +75,8 @@ typedef uint32 AclMode;			/* a bitmask of privilege bits */
 #define ACL_CREATE		(1<<9)	/* for namespaces and databases */
 #define ACL_CREATE_TEMP (1<<10) /* for databases */
 #define ACL_CONNECT		(1<<11) /* for databases */
-#define N_ACL_RIGHTS	12		/* 1 plus the last 1<<x */
+#define ACL_CREATE_SUBSCRIPTION	(1<<12)
+#define N_ACL_RIGHTS	13		/* 1 plus the last 1<<x */
 #define ACL_NO_RIGHTS	0
 /* Currently, SELECT ... FOR [KEY] UPDATE/SHARE requires UPDATE privileges */
 #define ACL_SELECT_FOR_UPDATE	ACL_UPDATE
diff --git a/src/include/utils/acl.h b/src/include/utils/acl.h
index 5a5836fd4a..a9fdb3250c 100644
--- a/src/include/utils/acl.h
+++ b/src/include/utils/acl.h
@@ -139,9 +139,10 @@ typedef ArrayType Acl;
 #define ACL_CREATE_CHR			'C'
 #define ACL_CREATE_TEMP_CHR		'T'
 #define ACL_CONNECT_CHR			'c'
+#define ACL_CREATE_SUBSCRIPTION_CHR 'S'
 
 /* string holding all privilege code chars, in order by bitmask position */
-#define ACL_ALL_RIGHTS_STR	"arwdDxtXUCTc"
+#define ACL_ALL_RIGHTS_STR	"arwdDxtXUCTcS"
 
 /*
  * Bitmasks defining "all rights" for each supported object type
@@ -149,7 +150,7 @@ typedef ArrayType Acl;
 #define ACL_ALL_RIGHTS_COLUMN		(ACL_INSERT|ACL_SELECT|ACL_UPDATE|ACL_REFERENCES)
 #define ACL_ALL_RIGHTS_RELATION		(ACL_INSERT|ACL_SELECT|ACL_UPDATE|ACL_DELETE|ACL_TRUNCATE|ACL_REFERENCES|ACL_TRIGGER)
 #define ACL_ALL_RIGHTS_SEQUENCE		(ACL_USAGE|ACL_SELECT|ACL_UPDATE)
-#define ACL_ALL_RIGHTS_DATABASE		(ACL_CREATE|ACL_CREATE_TEMP|ACL_CONNECT)
+#define ACL_ALL_RIGHTS_DATABASE		(ACL_CREATE|ACL_CREATE_TEMP|ACL_CONNECT|ACL_CREATE_SUBSCRIPTION)
 #define ACL_ALL_RIGHTS_FDW			(ACL_USAGE)
 #define ACL_ALL_RIGHTS_FOREIGN_SERVER (ACL_USAGE)
 #define ACL_ALL_RIGHTS_FUNCTION		(ACL_EXECUTE)
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 3471d88ca7..b5aa15336b 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -2,8 +2,9 @@
 -- SUBSCRIPTION
 --
 CREATE ROLE regress_subscription_user LOGIN SUPERUSER;
+CREATE ROLE regress_subscription_user2 LOGIN;
 CREATE ROLE regress_subscription_user_dummy LOGIN NOSUPERUSER;
-SET SESSION AUTHORIZATION 'regress_subscription_user';
+SET SESSION AUTHORIZATION regress_subscription_user;
 -- fail - no publications
 CREATE SUBSCRIPTION testsub CONNECTION 'foo';
 ERROR:  syntax error at or near ";"
@@ -90,6 +91,17 @@ COMMIT;
 BEGIN;
 DROP SUBSCRIPTION testsub NODROP SLOT;
 COMMIT;
+-- permissions
+set client_min_messages to error;
+SET SESSION AUTHORIZATION regress_subscription_user2;
+CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (DISABLED, NOCREATE SLOT);  -- fail
+ERROR:  permission denied for database regression
+SET SESSION AUTHORIZATION regress_subscription_user;
+GRANT CREATE SUBSCRIPTION ON DATABASE regression TO regress_subscription_user2;
+SET SESSION AUTHORIZATION regress_subscription_user2;
+CREATE SUBSCRIPTION testsub2 CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (DISABLED, NOCREATE SLOT);  -- ok
+reset client_min_messages;
+DROP SUBSCRIPTION testsub2 NODROP SLOT;
 RESET SESSION AUTHORIZATION;
 DROP ROLE regress_subscription_user;
 DROP ROLE regress_subscription_user_dummy;
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 5c05b14f9e..4b19110abc 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -3,8 +3,9 @@
 --
 
 CREATE ROLE regress_subscription_user LOGIN SUPERUSER;
+CREATE ROLE regress_subscription_user2 LOGIN;
 CREATE ROLE regress_subscription_user_dummy LOGIN NOSUPERUSER;
-SET SESSION AUTHORIZATION 'regress_subscription_user';
+SET SESSION AUTHORIZATION regress_subscription_user;
 
 -- fail - no publications
 CREATE SUBSCRIPTION testsub CONNECTION 'foo';
@@ -65,6 +66,17 @@ CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub
 DROP SUBSCRIPTION testsub NODROP SLOT;
 COMMIT;
 
+-- permissions
+set client_min_messages to error;
+SET SESSION AUTHORIZATION regress_subscription_user2;
+CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (DISABLED, NOCREATE SLOT);  -- fail
+SET SESSION AUTHORIZATION regress_subscription_user;
+GRANT CREATE SUBSCRIPTION ON DATABASE regression TO regress_subscription_user2;
+SET SESSION AUTHORIZATION regress_subscription_user2;
+CREATE SUBSCRIPTION testsub2 CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (DISABLED, NOCREATE SLOT);  -- ok
+reset client_min_messages;
+DROP SUBSCRIPTION testsub2 NODROP SLOT;
+
 RESET SESSION AUTHORIZATION;
 DROP ROLE regress_subscription_user;
 DROP ROLE regress_subscription_user_dummy;
-- 
2.12.0

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to