On Fri, 2023-12-29 at 15:22 -0800, Jeff Davis wrote:
> On Tue, 2023-09-05 at 12:08 -0700, Jeff Davis wrote:
> > OK, so we could have a built-in FDW called pg_connection that would
> > do
> > the right kinds of validation; and then also allow other FDWs but
> > the
> > subscription would have to do its own validation.
>
> Attached a rough rebased version.
Attached a slightly better version which fixes a pg_dump issue and
improves the documentation.
Regards,
Jeff Davis
From 0b8cb23157b86909d38cc10723f19d94787efed2 Mon Sep 17 00:00:00 2001
From: Jeff Davis <[email protected]>
Date: Wed, 23 Aug 2023 10:31:16 -0700
Subject: [PATCH v4] CREATE SUBSCRIPTION ... SERVER.
---
doc/src/sgml/ref/alter_subscription.sgml | 18 +-
doc/src/sgml/ref/create_subscription.sgml | 16 +-
doc/src/sgml/user-manag.sgml | 12 +-
src/backend/catalog/Makefile | 1 +
src/backend/catalog/pg_subscription.c | 17 +-
src/backend/catalog/system_functions.sql | 2 +
src/backend/commands/subscriptioncmds.c | 197 ++++++++++++++--
src/backend/foreign/foreign.c | 214 ++++++++++++++++++
src/backend/parser/gram.y | 20 ++
src/backend/replication/logical/worker.c | 12 +-
src/bin/pg_dump/pg_dump.c | 63 +++++-
src/bin/pg_dump/pg_dump.h | 1 +
src/bin/psql/describe.c | 2 +-
src/bin/psql/tab-complete.c | 2 +-
src/include/catalog/meson.build | 1 +
src/include/catalog/pg_authid.dat | 5 +
.../catalog/pg_foreign_data_wrapper.dat | 22 ++
src/include/catalog/pg_proc.dat | 8 +
src/include/catalog/pg_subscription.h | 5 +-
src/include/foreign/foreign.h | 1 +
src/include/nodes/parsenodes.h | 3 +
src/test/regress/expected/foreign_data.out | 60 ++++-
src/test/regress/expected/subscription.out | 38 ++++
src/test/regress/sql/foreign_data.sql | 41 +++-
src/test/regress/sql/subscription.sql | 39 ++++
src/test/subscription/t/001_rep_changes.pl | 57 +++++
26 files changed, 804 insertions(+), 53 deletions(-)
create mode 100644 src/include/catalog/pg_foreign_data_wrapper.dat
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 6d36ff0dc9..6d219145a9 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -21,6 +21,7 @@ PostgreSQL documentation
<refsynopsisdiv>
<synopsis>
+ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SERVER <replaceable>servername</replaceable>
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> CONNECTION '<replaceable>conninfo</replaceable>'
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SET PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> ADD PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
@@ -94,13 +95,24 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
</listitem>
</varlistentry>
+ <varlistentry id="sql-altersubscription-params-server">
+ <term><literal>SERVER <replaceable class="parameter">servername</replaceable></literal></term>
+ <listitem>
+ <para>
+ This clause replaces the foreign server or connection string originally
+ set by <xref linkend="sql-createsubscription"/> with the foreign server
+ <replaceable>servername</replaceable>.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry id="sql-altersubscription-params-connection">
<term><literal>CONNECTION '<replaceable class="parameter">conninfo</replaceable>'</literal></term>
<listitem>
<para>
- This clause replaces the connection string originally set by
- <xref linkend="sql-createsubscription"/>. See there for more
- information.
+ This clause replaces the foreign server or connection string originally
+ set by <xref linkend="sql-createsubscription"/> with the connection
+ string <replaceable>conninfo</replaceable>.
</para>
</listitem>
</varlistentry>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index f1c20b3a46..8cf67516cf 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -22,7 +22,7 @@ PostgreSQL documentation
<refsynopsisdiv>
<synopsis>
CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceable>
- CONNECTION '<replaceable class="parameter">conninfo</replaceable>'
+ { SERVER <replaceable class="parameter">servername</replaceable> | CONNECTION '<replaceable class="parameter">conninfo</replaceable>' }
PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...]
[ WITH ( <replaceable class="parameter">subscription_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
</synopsis>
@@ -77,6 +77,15 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
</listitem>
</varlistentry>
+ <varlistentry id="sql-createsubscription-params-server">
+ <term><literal>SERVER <replaceable class="parameter">servername</replaceable></literal></term>
+ <listitem>
+ <para>
+ A foreign server to use for the connection.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry id="sql-createsubscription-params-connection">
<term><literal>CONNECTION '<replaceable class="parameter">conninfo</replaceable>'</literal></term>
<listitem>
@@ -363,6 +372,11 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
The default is <literal>true</literal>. Only superusers can set
this value to <literal>false</literal>.
</para>
+ <para>
+ Only allowed when using a connection string. If using a foreign
+ server, specify <literal>password_required</literal> as part of the
+ user mapping for the foreign server, instead.
+ </para>
</listitem>
</varlistentry>
diff --git a/doc/src/sgml/user-manag.sgml b/doc/src/sgml/user-manag.sgml
index 92a299d2d3..4f4c20ba3c 100644
--- a/doc/src/sgml/user-manag.sgml
+++ b/doc/src/sgml/user-manag.sgml
@@ -687,11 +687,19 @@ DROP ROLE doomed_role;
<entry>Allow use of connection slots reserved via
<xref linkend="guc-reserved-connections"/>.</entry>
</row>
+ <row>
+ <entry>pg_create_connection</entry>
+ <entry>Allow users to specify a connection string directly in <link
+ linkend="sql-createsubscription"><command>CREATE
+ SUBSCRIPTION</command></link>.</entry>
+ </row>
<row>
<entry>pg_create_subscription</entry>
<entry>Allow users with <literal>CREATE</literal> permission on the
- database to issue
- <link linkend="sql-createsubscription"><command>CREATE SUBSCRIPTION</command></link>.</entry>
+ database to issue <link
+ linkend="sql-createsubscription"><command>CREATE
+ SUBSCRIPTION</command></link>. This role is a member of
+ <literal>pg_create_connection</literal>.</entry>
</row>
</tbody>
</tgroup>
diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile
index ec7b6f5362..365c956dea 100644
--- a/src/backend/catalog/Makefile
+++ b/src/backend/catalog/Makefile
@@ -136,6 +136,7 @@ POSTGRES_BKI_DATA = $(addprefix $(top_srcdir)/src/include/catalog/,\
pg_collation.dat \
pg_conversion.dat \
pg_database.dat \
+ pg_foreign_data_wrapper.dat \
pg_language.dat \
pg_namespace.dat \
pg_opclass.dat \
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index d6a978f136..f5c4ec8d99 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -23,6 +23,7 @@
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h"
+#include "foreign/foreign.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "storage/lmgr.h"
@@ -75,10 +76,18 @@ GetSubscription(Oid subid, bool missing_ok)
sub->runasowner = subform->subrunasowner;
/* Get conninfo */
- datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
- tup,
- Anum_pg_subscription_subconninfo);
- sub->conninfo = TextDatumGetCString(datum);
+ if (OidIsValid(subform->subserver))
+ {
+ sub->conninfo = ForeignServerConnectionString(subform->subowner,
+ subform->subserver);
+ }
+ else
+ {
+ datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
+ tup,
+ Anum_pg_subscription_subconninfo);
+ sub->conninfo = TextDatumGetCString(datum);
+ }
/* Get slotname */
datum = SysCacheGetAttr(SUBSCRIPTIONOID,
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index 4206752881..e1abda41ba 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -781,3 +781,5 @@ GRANT pg_read_all_settings TO pg_monitor;
GRANT pg_read_all_stats TO pg_monitor;
GRANT pg_stat_scan_tables TO pg_monitor;
+
+GRANT pg_create_connection TO pg_create_subscription;
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index edc82c11be..c46900182e 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -25,14 +25,17 @@
#include "catalog/objectaddress.h"
#include "catalog/pg_authid_d.h"
#include "catalog/pg_database_d.h"
+#include "catalog/pg_foreign_server.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h"
+#include "catalog/pg_user_mapping.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "commands/event_trigger.h"
#include "commands/subscriptioncmds.h"
#include "executor/executor.h"
+#include "foreign/foreign.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "pgstat.h"
@@ -574,6 +577,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
Datum values[Natts_pg_subscription];
Oid owner = GetUserId();
HeapTuple tup;
+ Oid serverid;
+ Oid umid;
char *conninfo;
char originname[NAMEDATALEN];
List *publications;
@@ -594,6 +599,12 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN);
parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
+ if (IsSet(opts.specified_opts, SUBOPT_PASSWORD_REQUIRED) && stmt->servername)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("option \"password_required\" invalid on subscriptions to a foreign server"),
+ errhint("Use the \"password_required\" option on the user mappings associated with the foreign server.")));
+
/*
* Since creating a replication slot is not transactional, rolling back
* the transaction leaves the created replication slot. So we cannot run
@@ -604,9 +615,9 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
/*
- * We don't want to allow unprivileged users to be able to trigger
- * attempts to access arbitrary network destinations, so require the user
- * to have been specifically authorized to create subscriptions.
+ * We don't want to allow unprivileged users to utilize the resources that
+ * a subscription requires (such as a background worker), so require the
+ * user to have been specifically authorized to create subscriptions.
*/
if (!has_privs_of_role(owner, ROLE_PG_CREATE_SUBSCRIPTION))
ereport(ERROR,
@@ -666,14 +677,54 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
if (opts.synchronous_commit == NULL)
opts.synchronous_commit = "off";
- conninfo = stmt->conninfo;
- publications = stmt->publication;
-
/* Load the library providing us libpq calls. */
load_file("libpqwalreceiver", false);
- /* Check the connection info string. */
- walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
+ if (stmt->servername)
+ {
+ ForeignServer *server;
+ UserMapping *um;
+
+ Assert(!stmt->conninfo);
+ conninfo = NULL;
+
+ server = GetForeignServerByName(stmt->servername, false);
+ aclresult = object_aclcheck(ForeignServerRelationId, server->serverid, owner, ACL_USAGE);
+ if (aclresult != ACLCHECK_OK)
+ aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, server->servername);
+
+ um = GetUserMapping(owner, server->serverid);
+
+ serverid = server->serverid;
+ umid = um->umid;
+ conninfo = ForeignServerConnectionString(owner, serverid);
+ }
+ else
+ {
+ Assert(stmt->conninfo);
+
+ /*
+ * We don't want to allow unprivileged users to be able to trigger
+ * attempts to access arbitrary network destinations, so require the user
+ * to have been specifically authorized to create connections.
+ */
+ if (!has_privs_of_role(owner, ROLE_PG_CREATE_CONNECTION))
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("permission denied to create subscription with a connection string"),
+ errdetail("Only roles with privileges of the \"%s\" role may create subscriptions with CONNECTION specified.",
+ "pg_create_connection"),
+ errhint("Create a subscription to a foreign server by specifying SERVER instead.")));
+
+ /* Check the connection info string. */
+ walrcv_check_conninfo(stmt->conninfo, opts.passwordrequired && !superuser());
+
+ serverid = InvalidOid;
+ umid = InvalidOid;
+ conninfo = stmt->conninfo;
+ }
+
+ publications = stmt->publication;
/* Everything ok, form a new tuple. */
memset(values, 0, sizeof(values));
@@ -697,8 +748,12 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
- values[Anum_pg_subscription_subconninfo - 1] =
- CStringGetTextDatum(conninfo);
+ values[Anum_pg_subscription_subserver - 1] = serverid;
+ if (!OidIsValid(serverid))
+ values[Anum_pg_subscription_subconninfo - 1] =
+ CStringGetTextDatum(conninfo);
+ else
+ nulls[Anum_pg_subscription_subconninfo - 1] = true;
if (opts.slot_name)
values[Anum_pg_subscription_subslotname - 1] =
DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
@@ -719,6 +774,20 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
+ ObjectAddressSet(myself, SubscriptionRelationId, subid);
+
+ if (stmt->servername)
+ {
+ ObjectAddress referenced;
+ Assert(OidIsValid(serverid) && OidIsValid(umid));
+
+ ObjectAddressSet(referenced, ForeignServerRelationId, serverid);
+ recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+
+ ObjectAddressSet(referenced, UserMappingRelationId, umid);
+ recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+ }
+
ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
replorigin_create(originname);
@@ -835,8 +904,6 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
if (opts.enabled)
ApplyLauncherWakeupAtCommit();
- ObjectAddressSet(myself, SubscriptionRelationId, subid);
-
InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
return myself;
@@ -1124,6 +1191,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
memset(nulls, false, sizeof(nulls));
memset(replaces, false, sizeof(replaces));
+ ObjectAddressSet(myself, SubscriptionRelationId, subid);
+
switch (stmt->kind)
{
case ALTER_SUBSCRIPTION_OPTIONS:
@@ -1191,6 +1260,12 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
if (IsSet(opts.specified_opts, SUBOPT_PASSWORD_REQUIRED))
{
+ if (OidIsValid(form->subserver))
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("option \"password_required\" invalid on subscriptions to a foreign server"),
+ errhint("Use the \"password_required\" option on the user mappings associated with the foreign server.")));
+
/* Non-superuser may not disable password_required. */
if (!opts.passwordrequired && !superuser())
ereport(ERROR,
@@ -1244,7 +1319,77 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
break;
}
+ case ALTER_SUBSCRIPTION_SERVER:
+ {
+ ForeignServer *new_server;
+ UserMapping *new_um;
+ ObjectAddress referenced;
+ AclResult aclresult;
+
+ /*
+ * Remove what was there before, either another foreign server
+ * or a connection string.
+ */
+ if (form->subserver)
+ {
+ UserMapping *old_um = GetUserMapping(form->subowner, form->subserver);
+
+ deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
+ DEPENDENCY_NORMAL,
+ ForeignServerRelationId, form->subserver);
+ deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
+ DEPENDENCY_NORMAL,
+ UserMappingRelationId, old_um->umid);
+ }
+ else
+ {
+ nulls[Anum_pg_subscription_subconninfo - 1] = true;
+ replaces[Anum_pg_subscription_subconninfo - 1] = true;
+ }
+
+ /*
+ * Find the new server and user mapping. Check ACL of server
+ * based on current user ID, but find the user mapping based
+ * on the subscription owner.
+ */
+ new_server = GetForeignServerByName(stmt->servername, false);
+ aclresult = object_aclcheck(ForeignServerRelationId,
+ new_server->serverid, GetUserId(), ACL_USAGE);
+ if (aclresult != ACLCHECK_OK)
+ aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER,
+ new_server->servername);
+
+ new_um = GetUserMapping(form->subowner, new_server->serverid);
+
+ values[Anum_pg_subscription_subserver - 1] = new_server->serverid;
+ replaces[Anum_pg_subscription_subserver - 1] = true;
+
+ ObjectAddressSet(referenced, ForeignServerRelationId, new_server->serverid);
+ recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+
+ ObjectAddressSet(referenced, UserMappingRelationId, new_um->umid);
+ recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+ update_tuple = true;
+ }
+ break;
+
case ALTER_SUBSCRIPTION_CONNECTION:
+ /* remove reference to foreign server and dependencies, if present */
+ if (form->subserver)
+ {
+ UserMapping *old_um = GetUserMapping(form->subowner, form->subserver);
+
+ deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
+ DEPENDENCY_NORMAL,
+ ForeignServerRelationId, form->subserver);
+ deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
+ DEPENDENCY_NORMAL,
+ UserMappingRelationId, old_um->umid);
+
+ values[Anum_pg_subscription_subserver - 1] = InvalidOid;
+ replaces[Anum_pg_subscription_subserver - 1] = true;
+ }
+
/* Load the library providing us libpq calls. */
load_file("libpqwalreceiver", false);
/* Check the connection info string. */
@@ -1455,8 +1600,6 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
table_close(rel, RowExclusiveLock);
- ObjectAddressSet(myself, SubscriptionRelationId, subid);
-
InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
/* Wake up related replication workers to handle this change quickly. */
@@ -1541,9 +1684,17 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
subname = pstrdup(NameStr(*DatumGetName(datum)));
/* Get conninfo */
- datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
- Anum_pg_subscription_subconninfo);
- conninfo = TextDatumGetCString(datum);
+ if (OidIsValid(form->subserver))
+ {
+ conninfo = ForeignServerConnectionString(form->subowner,
+ form->subserver);
+ }
+ else
+ {
+ datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
+ Anum_pg_subscription_subconninfo);
+ conninfo = TextDatumGetCString(datum);
+ }
/* Get slotname */
datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
@@ -1644,6 +1795,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
}
/* Clean up dependencies */
+ deleteDependencyRecordsFor(SubscriptionRelationId, subid, false);
deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
/* Remove any associated relation synchronization states. */
@@ -1853,6 +2005,17 @@ AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
aclcheck_error(aclresult, OBJECT_DATABASE,
get_database_name(MyDatabaseId));
+ if (form->subserver)
+ {
+ UserMapping *old_um = GetUserMapping(form->subowner, form->subserver);
+ UserMapping *new_um = GetUserMapping(newOwnerId, form->subserver);
+
+ if (changeDependencyFor(SubscriptionRelationId, form->oid,
+ UserMappingRelationId, old_um->umid, new_um->umid) != 1)
+ elog(ERROR, "could not change user mapping dependency for subscription %u",
+ form->oid);
+ }
+
form->subowner = newOwnerId;
CatalogTupleUpdate(rel, &tup->t_self, tup);
diff --git a/src/backend/foreign/foreign.c b/src/backend/foreign/foreign.c
index fc3edef2a8..5a800fc48f 100644
--- a/src/backend/foreign/foreign.c
+++ b/src/backend/foreign/foreign.c
@@ -18,11 +18,14 @@
#include "catalog/pg_foreign_server.h"
#include "catalog/pg_foreign_table.h"
#include "catalog/pg_user_mapping.h"
+#include "commands/defrem.h"
#include "foreign/fdwapi.h"
#include "foreign/foreign.h"
#include "funcapi.h"
#include "lib/stringinfo.h"
+#include "mb/pg_wchar.h"
#include "miscadmin.h"
+#include "replication/walreceiver.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/rel.h"
@@ -190,6 +193,94 @@ GetForeignServerByName(const char *srvname, bool missing_ok)
}
+/*
+ * Escape a connection option value. Helper for options_to_connstr().
+ */
+static char *
+escape_value(char *val)
+{
+ StringInfoData result;
+
+ initStringInfo(&result);
+
+ for (int i = 0; val[i] != '\0'; i++)
+ {
+ if (val[i] == '\\' || val[i] == '\'')
+ appendStringInfoChar(&result, '\\');
+ appendStringInfoChar(&result, val[i]);
+ }
+
+ return result.data;
+}
+
+
+/*
+ * Helper for ForeignServerConnectionString() and pg_connection_validator().
+ *
+ * Transform a List of DefElem into a connection string.
+ *
+ * XXX: might leak memory, investigate
+ */
+static char *
+options_to_connstr(List *options)
+{
+ StringInfoData connstr;
+ ListCell *lc;
+ bool first = true;
+
+ initStringInfo(&connstr);
+ foreach(lc, options)
+ {
+ DefElem *d = (DefElem *) lfirst(lc);
+ char *name = d->defname;
+ char *value;
+
+ /* not a libpq option; skip */
+ if (strcmp(name, "password_required") == 0)
+ continue;
+
+ /* XXX: pfree() result of defGetString() if needed? */
+ value = escape_value(defGetString(d));
+
+ appendStringInfo(&connstr, "%s%s = '%s'",
+ first ? "" : " ", name, value);
+ first = false;
+
+ pfree(value);
+ }
+
+ /* override client_encoding */
+ appendStringInfo(&connstr, "%sclient_encoding = '%s'",
+ first ? "" : " ", GetDatabaseEncodingName());
+
+ return connstr.data;
+}
+
+
+/*
+ * Given a user ID and server ID, return a postgres connection string suitable
+ * to pass to libpq.
+ *
+ * XXX: might leak memory, investigate
+ */
+char *
+ForeignServerConnectionString(Oid userid, Oid serverid)
+{
+ ForeignServer *server = GetForeignServer(serverid);
+ UserMapping *um = GetUserMapping(userid, serverid);
+ List *options = list_concat(um->options, server->options);
+ char *connstr;
+
+ connstr = options_to_connstr(options);
+
+ pfree(server);
+ pfree(um);
+ list_free(options);
+
+ return connstr;
+}
+
+
/*
* GetUserMapping - look up the user mapping.
*
@@ -599,6 +690,129 @@ is_conninfo_option(const char *option, Oid context)
return false;
}
+/*
+ * pg_connection_handler
+ *
+ * pg_connection_fdw is not used for foreign tables, so the handler should
+ * never be called.
+ */
+Datum
+pg_connection_handler(PG_FUNCTION_ARGS)
+{
+ elog(ERROR, "pg_connection_handler unexpectedly called");
+}
+
+/*
+ * Option validator for CREATE SERVER ... FOR CONNECTION ONLY.
+ *
+ * XXX: try to unify with validators for CREATE SUBSCRIPTION ... CONNECTION,
+ * postgres_fdw, and dblink. Also investigate if memory leaks are a problem
+ * here.
+ */
+Datum
+pg_connection_validator(PG_FUNCTION_ARGS)
+{
+ List *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
+ Oid catalog = PG_GETARG_OID(1);
+
+ if (catalog == ForeignServerRelationId)
+ {
+ char *conninfo;
+ ListCell *lc;
+
+ foreach(lc, options_list)
+ {
+ DefElem *d = (DefElem *) lfirst(lc);
+
+ if (strcmp(d->defname, "client_encoding") == 0)
+ ereport(ERROR,
+ (errmsg("cannot specify client_encoding for pg_connection_fdw")));
+
+ if (strcmp(d->defname, "user") == 0 ||
+ strcmp(d->defname, "password") == 0 ||
+ strcmp(d->defname, "sslpassword") == 0 ||
+ strcmp(d->defname, "password_required") == 0)
+ ereport(ERROR,
+ (errmsg("invalid option \"%s\" for pg_connection_fdw",
+ d->defname),
+ errhint("Specify option \"%s\" for a user mapping associated with the server instead.",
+ d->defname)));
+ }
+
+ conninfo = options_to_connstr(options_list);
+
+ /* Load the library providing us libpq calls. */
+ load_file("libpqwalreceiver", false);
+
+ walrcv_check_conninfo(conninfo, false);
+ }
+ else if (catalog == UserMappingRelationId)
+ {
+ bool password_required = true;
+ bool password_provided = false;
+ ListCell *lc;
+
+ foreach(lc, options_list)
+ {
+ DefElem *d = (DefElem *) lfirst(lc);
+
+ if (strcmp(d->defname, "password_required") == 0)
+ {
+ /*
+ * Only the superuser may set this option on a user mapping, or
+ * alter a user mapping on which this option is set. We allow a
+ * user to clear this option if it's set - in fact, we don't have
+ * a choice since we can't see the old mapping when validating an
+ * alter.
+ */
+ if (!superuser() && !defGetBoolean(d))
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("password_required=false is superuser-only for pg_connection_fdw"),
+ errhint("User mappings with the password_required option set to false may only be created or modified by the superuser.")));
+
+ password_required = defGetBoolean(d);
+ }
+
+ if ((strcmp(d->defname, "sslkey") == 0 || strcmp(d->defname, "sslcert") == 0) && !superuser())
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("sslcert and sslkey are superuser-only for pg_connection_fdw"),
+ errhint("User mappings with the sslcert or sslkey options set may only be created or modified by the superuser.")));
+
+ if (strcmp(d->defname, "password") == 0)
+ password_provided = true;
+
+ if (strcmp(d->defname, "user") != 0 &&
+ strcmp(d->defname, "password") != 0 &&
+ strcmp(d->defname, "sslpassword") != 0 &&
+ strcmp(d->defname, "sslkey") != 0 &&
+ strcmp(d->defname, "sslcert") != 0 &&
+ strcmp(d->defname, "password_required") != 0)
+ ereport(ERROR,
+ (errmsg("invalid user mapping option \"%s\" for pg_connection_fdw",
+ d->defname)));
+ }
+
+ if (password_required && !password_provided)
+ ereport(ERROR,
+ (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
+ errmsg("password is required"),
+ errdetail("Non-superusers must provide a password in the connection string.")));
+ }
+ else if (catalog == ForeignTableRelationId)
+ ereport(ERROR,
+ (errmsg("cannot create foreign table using a pg_connection_fdw server"),
+ errhint("Use the server with CREATE SUBSCRIPTION instead.")));
+ else if (catalog == AttributeRelationId)
+ elog(ERROR, "unexpected call to pg_connection_validator for pg_attribute catalog");
+ else
+ elog(ERROR, "unexpected call to pg_connection_validator for catalog %d", catalog);
+
+
+ PG_RETURN_BOOL(true);
+}
+
/*
* Validate the generic option given to SERVER or USER MAPPING.
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 63f172e175..259b5ca42b 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -10638,6 +10638,16 @@ CreateSubscriptionStmt:
n->options = $8;
$$ = (Node *) n;
}
+ | CREATE SUBSCRIPTION name SERVER name PUBLICATION name_list opt_definition
+ {
+ CreateSubscriptionStmt *n =
+ makeNode(CreateSubscriptionStmt);
+ n->subname = $3;
+ n->servername = $5;
+ n->publication = $7;
+ n->options = $8;
+ $$ = (Node *) n;
+ }
;
/*****************************************************************************
@@ -10667,6 +10677,16 @@ AlterSubscriptionStmt:
n->conninfo = $5;
$$ = (Node *) n;
}
+ | ALTER SUBSCRIPTION name SERVER name
+ {
+ AlterSubscriptionStmt *n =
+ makeNode(AlterSubscriptionStmt);
+
+ n->kind = ALTER_SUBSCRIPTION_SERVER;
+ n->subname = $3;
+ n->servername = $5;
+ $$ = (Node *) n;
+ }
| ALTER SUBSCRIPTION name REFRESH PUBLICATION opt_definition
{
AlterSubscriptionStmt *n =
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 21abf34ef7..be63ab5a5d 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4003,7 +4003,9 @@ maybe_reread_subscription(void)
}
/*
- * Callback from subscription syscache invalidation.
+ * Callback from subscription syscache invalidation. Also needed for server or
+ * user mapping invalidation, which can change the connection information for
+ * subscriptions that connect using a server object.
*/
static void
subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
@@ -4639,6 +4641,14 @@ InitializeLogRepWorker(void)
CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
subscription_change_cb,
(Datum) 0);
+ /* Keep us informed about subscription changes. */
+ CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
+ subscription_change_cb,
+ (Datum) 0);
+ /* Keep us informed about subscription changes. */
+ CacheRegisterSyscacheCallback(USERMAPPINGOID,
+ subscription_change_cb,
+ (Datum) 0);
CacheRegisterSyscacheCallback(AUTHOID,
subscription_change_cb,
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 050a831226..b0664b6bbb 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -2066,6 +2066,27 @@ selectDumpableStatisticsObject(StatsExtInfo *sobj, Archive *fout)
sobj->dobj.dump = DUMP_COMPONENT_NONE;
}
+/*
+ * selectDumpableFdw: policy-setting subroutine
+ * Mark foreign data wrapper as to be dumped or not
+ *
+ * Froeign Data Wrappers do not belong to any particular namespace. To
+ * identify built-in foreign data wrappers, we must resort to checking whether
+ * the method's OID is in the range reserved for initdb.
+ */
+static void
+selectDumpableFdw(FdwInfo *fdwinfo, Archive *fout)
+{
+ if (checkExtensionMembership(&fdwinfo->dobj, fout))
+ return; /* extension membership overrides all else */
+
+ if (fdwinfo->dobj.catId.oid <= (Oid) g_last_builtin_oid)
+ fdwinfo->dobj.dump = DUMP_COMPONENT_NONE;
+ else
+ fdwinfo->dobj.dump = fout->dopt->include_everything ?
+ DUMP_COMPONENT_ALL : DUMP_COMPONENT_NONE;
+}
+
/*
* selectDumpableObject: policy-setting subroutine
* Mark a generic dumpable object as to be dumped or not
@@ -4633,6 +4654,7 @@ getSubscriptions(Archive *fout)
int i_subdisableonerr;
int i_subpasswordrequired;
int i_subrunasowner;
+ int i_subservername;
int i_subconninfo;
int i_subslotname;
int i_subsynccommit;
@@ -4693,18 +4715,27 @@ getSubscriptions(Archive *fout)
appendPQExpBufferStr(query,
" s.subpasswordrequired,\n"
" s.subrunasowner,\n"
- " s.suborigin\n");
+ " s.suborigin,\n");
else
appendPQExpBuffer(query,
" 't' AS subpasswordrequired,\n"
" 't' AS subrunasowner,\n"
- " '%s' AS suborigin\n",
+ " '%s' AS suborigin,\n",
LOGICALREP_ORIGIN_ANY);
- appendPQExpBufferStr(query,
- "FROM pg_subscription s\n"
- "WHERE s.subdbid = (SELECT oid FROM pg_database\n"
- " WHERE datname = current_database())");
+ if (fout->remoteVersion >= 170000)
+ appendPQExpBufferStr(query,
+ " fs.srvname AS subservername\n"
+ "FROM pg_subscription s LEFT JOIN \n"
+ "pg_foreign_server fs ON s.subserver = fs.oid\n"
+ "WHERE s.subdbid = (SELECT oid FROM pg_database\n"
+ " WHERE datname = current_database())");
+ else
+ appendPQExpBufferStr(query,
+ " NULL AS subservername\n"
+ "FROM pg_subscription s\n"
+ "WHERE s.subdbid = (SELECT oid FROM pg_database\n"
+ " WHERE datname = current_database())");
res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
@@ -4724,6 +4755,7 @@ getSubscriptions(Archive *fout)
i_subdisableonerr = PQfnumber(res, "subdisableonerr");
i_subpasswordrequired = PQfnumber(res, "subpasswordrequired");
i_subrunasowner = PQfnumber(res, "subrunasowner");
+ i_subservername = PQfnumber(res, "subservername");
i_subconninfo = PQfnumber(res, "subconninfo");
i_subslotname = PQfnumber(res, "subslotname");
i_subsynccommit = PQfnumber(res, "subsynccommit");
@@ -4741,7 +4773,10 @@ getSubscriptions(Archive *fout)
AssignDumpId(&subinfo[i].dobj);
subinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_subname));
subinfo[i].rolname = getRoleName(PQgetvalue(res, i, i_subowner));
-
+ if (PQgetisnull(res, i, i_subservername))
+ subinfo[i].subservername = NULL;
+ else
+ subinfo[i].subservername = pg_strdup(PQgetvalue(res, i, i_subservername));
subinfo[i].subbinary =
pg_strdup(PQgetvalue(res, i, i_subbinary));
subinfo[i].substream =
@@ -4804,9 +4839,17 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
appendPQExpBuffer(delq, "DROP SUBSCRIPTION %s;\n",
qsubname);
- appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s CONNECTION ",
+ appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s ",
qsubname);
- appendStringLiteralAH(query, subinfo->subconninfo, fout);
+ if (subinfo->subservername)
+ {
+ appendPQExpBuffer(query, "SERVER %s", fmtId(subinfo->subservername));
+ }
+ else
+ {
+ appendPQExpBuffer(query, "CONNECTION ");
+ appendStringLiteralAH(query, subinfo->subconninfo, fout);
+ }
/* Build list of quoted publications and append them to query. */
if (!parsePGArray(subinfo->subpublications, &pubnames, &npubnames))
@@ -9513,7 +9556,7 @@ getForeignDataWrappers(Archive *fout, int *numForeignDataWrappers)
fdwinfo[i].fdwoptions = pg_strdup(PQgetvalue(res, i, i_fdwoptions));
/* Decide whether we want to dump it */
- selectDumpableObject(&(fdwinfo[i].dobj), fout);
+ selectDumpableFdw(&fdwinfo[i], fout);
/* Mark whether FDW has an ACL */
if (!PQgetisnull(res, i, i_fdwacl))
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 673ca5c92d..0a94b9b7ae 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -667,6 +667,7 @@ typedef struct _SubscriptionInfo
char *subdisableonerr;
char *subpasswordrequired;
char *subrunasowner;
+ char *subservername;
char *subconninfo;
char *subslotname;
char *subsynccommit;
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 5077e7b358..fb4ea96968 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -5793,7 +5793,7 @@ listForeignDataWrappers(const char *pattern, bool verbose)
gettext_noop("Description"));
}
- appendPQExpBufferStr(&buf, "\nFROM pg_catalog.pg_foreign_data_wrapper fdw\n");
+ appendPQExpBufferStr(&buf, "\nFROM (SELECT tableoid, * FROM pg_catalog.pg_foreign_data_wrapper WHERE oid>=16384) fdw\n");
if (verbose)
appendPQExpBufferStr(&buf,
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 049801186c..230683a850 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -3314,7 +3314,7 @@ psql_completion(const char *text, int start, int end)
/* CREATE SUBSCRIPTION */
else if (Matches("CREATE", "SUBSCRIPTION", MatchAny))
- COMPLETE_WITH("CONNECTION");
+ COMPLETE_WITH("SERVER", "CONNECTION");
else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "CONNECTION", MatchAny))
COMPLETE_WITH("PUBLICATION");
else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "CONNECTION",
diff --git a/src/include/catalog/meson.build b/src/include/catalog/meson.build
index dcb3c5f766..01e92089bf 100644
--- a/src/include/catalog/meson.build
+++ b/src/include/catalog/meson.build
@@ -83,6 +83,7 @@ bki_data = [
'pg_collation.dat',
'pg_conversion.dat',
'pg_database.dat',
+ 'pg_foreign_data_wrapper.dat',
'pg_language.dat',
'pg_namespace.dat',
'pg_opclass.dat',
diff --git a/src/include/catalog/pg_authid.dat b/src/include/catalog/pg_authid.dat
index 6b4a0aaaad..8fce457ab1 100644
--- a/src/include/catalog/pg_authid.dat
+++ b/src/include/catalog/pg_authid.dat
@@ -94,5 +94,10 @@
rolcreaterole => 'f', rolcreatedb => 'f', rolcanlogin => 'f',
rolreplication => 'f', rolbypassrls => 'f', rolconnlimit => '-1',
rolpassword => '_null_', rolvaliduntil => '_null_' },
+{ oid => '6123', oid_symbol => 'ROLE_PG_CREATE_CONNECTION',
+ rolname => 'pg_create_connection', rolsuper => 'f', rolinherit => 't',
+ rolcreaterole => 'f', rolcreatedb => 'f', rolcanlogin => 'f',
+ rolreplication => 'f', rolbypassrls => 'f', rolconnlimit => '-1',
+ rolpassword => '_null_', rolvaliduntil => '_null_' },
]
diff --git a/src/include/catalog/pg_foreign_data_wrapper.dat b/src/include/catalog/pg_foreign_data_wrapper.dat
new file mode 100644
index 0000000000..7d489bf849
--- /dev/null
+++ b/src/include/catalog/pg_foreign_data_wrapper.dat
@@ -0,0 +1,22 @@
+#----------------------------------------------------------------------
+#
+# pg_foreign_data_wrapper.dat
+# Initial contents of the pg_foreign_data_wrapper system catalog.
+#
+# Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
+# Portions Copyright (c) 1994, Regents of the University of California
+#
+# src/include/catalog/pg_foreign_data_wrapper.dat
+#
+#----------------------------------------------------------------------
+
+[
+
+{ oid => '6015', oid_symbol => 'PG_CONNECTION_FDW',
+ descr => 'Pseudo FDW for connections to Postgres',
+ fdwname => 'pg_connection_fdw', fdwowner => 'POSTGRES',
+ fdwhandler => 'pg_connection_handler',
+ fdwvalidator => 'pg_connection_validator',
+ fdwacl => '_null_', fdwoptions => '_null_'},
+
+]
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 9052f5262a..8e81e5f6bb 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -7516,6 +7516,14 @@
proname => 'postgresql_fdw_validator', prorettype => 'bool',
proargtypes => '_text oid', prosrc => 'postgresql_fdw_validator' },
+{ oid => '6122', descr => '(internal)',
+ proname => 'pg_connection_handler', prorettype => 'fdw_handler',
+ proargtypes => '', prosrc => 'pg_connection_handler' },
+
+{ oid => '6124', descr => '(internal)',
+ proname => 'pg_connection_validator', prorettype => 'bool',
+ proargtypes => '_text oid', prosrc => 'pg_connection_validator' },
+
{ oid => '2290', descr => 'I/O',
proname => 'record_in', provolatile => 's', prorettype => 'record',
proargtypes => 'cstring oid int4', prosrc => 'record_in' },
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index e0b91eacd2..4b83c8ca8b 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -93,9 +93,12 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
bool subrunasowner; /* True if replication should execute as the
* subscription owner */
+ Oid subserver; /* Set if connecting with server */
+
#ifdef CATALOG_VARLEN /* variable-length fields start here */
/* Connection string to the publisher */
- text subconninfo BKI_FORCE_NOT_NULL;
+ text subconninfo BKI_FORCE_NULL; /* Set if connecting with
+ connection string */
/* Slot name on publisher */
NameData subslotname BKI_FORCE_NULL;
diff --git a/src/include/foreign/foreign.h b/src/include/foreign/foreign.h
index 5256d4d91f..7058335d63 100644
--- a/src/include/foreign/foreign.h
+++ b/src/include/foreign/foreign.h
@@ -69,6 +69,7 @@ extern ForeignServer *GetForeignServerExtended(Oid serverid,
bits16 flags);
extern ForeignServer *GetForeignServerByName(const char *srvname,
bool missing_ok);
+extern char *ForeignServerConnectionString(Oid userid, Oid serverid);
extern UserMapping *GetUserMapping(Oid userid, Oid serverid);
extern ForeignDataWrapper *GetForeignDataWrapper(Oid fdwid);
extern ForeignDataWrapper *GetForeignDataWrapperExtended(Oid fdwid,
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index e494309da8..4b316d4664 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -4040,6 +4040,7 @@ typedef struct CreateSubscriptionStmt
{
NodeTag type;
char *subname; /* Name of the subscription */
+ char *servername; /* Server name of publisher */
char *conninfo; /* Connection string to publisher */
List *publication; /* One or more publication to subscribe to */
List *options; /* List of DefElem nodes */
@@ -4048,6 +4049,7 @@ typedef struct CreateSubscriptionStmt
typedef enum AlterSubscriptionType
{
ALTER_SUBSCRIPTION_OPTIONS,
+ ALTER_SUBSCRIPTION_SERVER,
ALTER_SUBSCRIPTION_CONNECTION,
ALTER_SUBSCRIPTION_SET_PUBLICATION,
ALTER_SUBSCRIPTION_ADD_PUBLICATION,
@@ -4062,6 +4064,7 @@ typedef struct AlterSubscriptionStmt
NodeTag type;
AlterSubscriptionType kind; /* ALTER_SUBSCRIPTION_OPTIONS, etc */
char *subname; /* Name of the subscription */
+ char *servername; /* Server name of publisher */
char *conninfo; /* Connection string to publisher */
List *publication; /* One or more publication to subscribe to */
List *options; /* List of DefElem nodes */
diff --git a/src/test/regress/expected/foreign_data.out b/src/test/regress/expected/foreign_data.out
index 1dfe23cc1e..b551405a3d 100644
--- a/src/test/regress/expected/foreign_data.out
+++ b/src/test/regress/expected/foreign_data.out
@@ -25,7 +25,7 @@ CREATE FOREIGN DATA WRAPPER dummy;
COMMENT ON FOREIGN DATA WRAPPER dummy IS 'useless';
CREATE FOREIGN DATA WRAPPER postgresql VALIDATOR postgresql_fdw_validator;
-- At this point we should have 2 built-in wrappers and no servers.
-SELECT fdwname, fdwhandler::regproc, fdwvalidator::regproc, fdwoptions FROM pg_foreign_data_wrapper ORDER BY 1, 2, 3;
+SELECT fdwname, fdwhandler::regproc, fdwvalidator::regproc, fdwoptions FROM pg_foreign_data_wrapper WHERE oid >= 16384 ORDER BY 1, 2, 3;
fdwname | fdwhandler | fdwvalidator | fdwoptions
------------+------------+--------------------------+------------
dummy | - | - |
@@ -394,6 +394,47 @@ CREATE SERVER t2 FOREIGN DATA WRAPPER foo;
RESET ROLE;
REVOKE regress_test_indirect FROM regress_test_role;
+-- test SERVER ... FOR CONNECTION ONLY
+SET ROLE regress_test_role;
+CREATE SERVER t3 FOREIGN DATA WRAPPER pg_connection_fdw; -- ERROR: not a member of pg_create_connection
+ERROR: permission denied for foreign-data wrapper pg_connection_fdw
+RESET ROLE;
+GRANT USAGE ON FOREIGN DATA WRAPPER pg_connection_fdw TO regress_test_role;
+SET ROLE regress_test_role;
+CREATE SERVER t3 FOREIGN DATA WRAPPER pg_connection_fdw OPTIONS (client_encoding 'foo'); --fails
+ERROR: cannot specify client_encoding for pg_connection_fdw
+CREATE SERVER t3 FOREIGN DATA WRAPPER pg_connection_fdw OPTIONS (user 'foo'); --fails
+ERROR: invalid option "user" for pg_connection_fdw
+HINT: Specify option "user" for a user mapping associated with the server instead.
+CREATE SERVER t3 FOREIGN DATA WRAPPER pg_connection_fdw OPTIONS (password 'foo'); --fails
+ERROR: invalid option "password" for pg_connection_fdw
+HINT: Specify option "password" for a user mapping associated with the server instead.
+CREATE SERVER t3 FOREIGN DATA WRAPPER pg_connection_fdw OPTIONS (password_required 'true'); --fails
+ERROR: invalid option "password_required" for pg_connection_fdw
+HINT: Specify option "password_required" for a user mapping associated with the server instead.
+CREATE SERVER t3 FOREIGN DATA WRAPPER pg_connection_fdw;
+IMPORT FOREIGN SCHEMA foo FROM SERVER t3 INTO bar; -- fails
+ERROR: schema "bar" does not exist
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x'); -- fails
+ERROR: password is required
+DETAIL: Non-superusers must provide a password in the connection string.
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', password_required 'false'); -- fails
+ERROR: password_required=false is superuser-only for pg_connection_fdw
+HINT: User mappings with the password_required option set to false may only be created or modified by the superuser.
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', application_name 'nonsense'); -- fails
+ERROR: invalid user mapping option "application_name" for pg_connection_fdw
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', password 'secret');
+DROP USER MAPPING FOR PUBLIC SERVER t3;
+RESET ROLE;
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x'); -- still fails
+ERROR: password is required
+DETAIL: Non-superusers must provide a password in the connection string.
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', password 'secret');
+DROP USER MAPPING FOR PUBLIC SERVER t3;
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', password_required 'false');
+DROP USER MAPPING FOR PUBLIC SERVER t3;
+DROP SERVER t3;
+REVOKE USAGE ON FOREIGN DATA WRAPPER pg_connection_fdw FROM regress_test_role;
-- ALTER SERVER
ALTER SERVER s0; -- ERROR
ERROR: syntax error at or near ";"
@@ -966,13 +1007,14 @@ NOTICE: relation "doesnt_exist_ft1" does not exist, skipping
ALTER FOREIGN TABLE IF EXISTS doesnt_exist_ft1 RENAME TO foreign_table_1;
NOTICE: relation "doesnt_exist_ft1" does not exist, skipping
-- Information schema
-SELECT * FROM information_schema.foreign_data_wrappers ORDER BY 1, 2;
- foreign_data_wrapper_catalog | foreign_data_wrapper_name | authorization_identifier | library_name | foreign_data_wrapper_language
-------------------------------+---------------------------+---------------------------+--------------+-------------------------------
- regression | dummy | regress_foreign_data_user | | c
- regression | foo | regress_foreign_data_user | | c
- regression | postgresql | regress_foreign_data_user | | c
-(3 rows)
+SELECT foreign_data_wrapper_catalog,foreign_data_wrapper_name FROM information_schema.foreign_data_wrappers ORDER BY 1, 2;
+ foreign_data_wrapper_catalog | foreign_data_wrapper_name
+------------------------------+---------------------------
+ regression | dummy
+ regression | foo
+ regression | pg_connection_fdw
+ regression | postgresql
+(4 rows)
SELECT * FROM information_schema.foreign_data_wrapper_options ORDER BY 1, 2, 3;
foreign_data_wrapper_catalog | foreign_data_wrapper_name | option_name | option_value
@@ -2186,7 +2228,7 @@ NOTICE: drop cascades to server s0
\c
DROP ROLE regress_foreign_data_user;
-- At this point we should have no wrappers, no servers, and no mappings.
-SELECT fdwname, fdwhandler, fdwvalidator, fdwoptions FROM pg_foreign_data_wrapper;
+SELECT fdwname, fdwhandler, fdwvalidator, fdwoptions FROM pg_foreign_data_wrapper WHERE oid >= 16384;
fdwname | fdwhandler | fdwvalidator | fdwoptions
---------+------------+--------------+------------
(0 rows)
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index b15eddbff3..738f1b1678 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -144,6 +144,44 @@ ERROR: could not connect to the publisher: invalid port number: "-1"
ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string
+-- temporarily revoke pg_create_connection from pg_create_subscription
+-- to test that CREATE SUBSCRIPTION ... CONNECTION fails
+RESET SESSION AUTHORIZATION;
+REVOKE pg_create_connection FROM pg_create_subscription;
+GRANT CREATE ON DATABASE REGRESSION TO regress_subscription_user3;
+SET SESSION AUTHORIZATION regress_subscription_user3;
+-- fail - not a member of pg_create_connection, cannot use CONNECTION
+CREATE SUBSCRIPTION regress_testsub6 CONNECTION 'dbname=regress_doesnotexist password=regress_fakepassword' PUBLICATION testpub WITH (slot_name = NONE, connect = false);
+ERROR: permission denied to create subscription with a connection string
+DETAIL: Only roles with privileges of the "pg_create_connection" role may create subscriptions with CONNECTION specified.
+HINT: Create a subscription to a foreign server by specifying SERVER instead.
+-- re-grant pg_create_connection to pg_create_subscription
+RESET SESSION AUTHORIZATION;
+GRANT pg_create_connection TO pg_create_subscription;
+SET SESSION AUTHORIZATION regress_subscription_user3;
+CREATE SUBSCRIPTION regress_testsub6 CONNECTION 'dbname=regress_doesnotexist password=regress_fakepassword' PUBLICATION testpub WITH (slot_name = NONE, connect = false);
+WARNING: subscription was created, but is not connected
+HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
+DROP SUBSCRIPTION regress_testsub6;
+-- test using a server object instead of connection string
+RESET SESSION AUTHORIZATION;
+CREATE SERVER regress_testserver FOREIGN DATA WRAPPER pg_connection_fdw;
+CREATE USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver
+ OPTIONS (password_required 'false');
+GRANT USAGE ON FOREIGN SERVER regress_testserver TO regress_subscription_user3;
+SET SESSION AUTHORIZATION regress_subscription_user3;
+CREATE SUBSCRIPTION regress_testsub6 SERVER regress_testserver PUBLICATION testpub
+ WITH (slot_name = NONE, connect = false);
+WARNING: subscription was created, but is not connected
+HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
+RESET SESSION AUTHORIZATION;
+ALTER SUBSCRIPTION regress_testsub6 OWNER TO regress_subscription_user; -- fail, no user mapping
+ERROR: user mapping not found for user "regress_subscription_user", server "regress_testserver"
+DROP SUBSCRIPTION regress_testsub6;
+DROP USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver;
+DROP SERVER regress_testserver;
+REVOKE CREATE ON DATABASE regression FROM regress_subscription_user3;
+SET SESSION AUTHORIZATION regress_subscription_user;
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN
diff --git a/src/test/regress/sql/foreign_data.sql b/src/test/regress/sql/foreign_data.sql
index eefb860adc..15a28de9ac 100644
--- a/src/test/regress/sql/foreign_data.sql
+++ b/src/test/regress/sql/foreign_data.sql
@@ -36,7 +36,7 @@ COMMENT ON FOREIGN DATA WRAPPER dummy IS 'useless';
CREATE FOREIGN DATA WRAPPER postgresql VALIDATOR postgresql_fdw_validator;
-- At this point we should have 2 built-in wrappers and no servers.
-SELECT fdwname, fdwhandler::regproc, fdwvalidator::regproc, fdwoptions FROM pg_foreign_data_wrapper ORDER BY 1, 2, 3;
+SELECT fdwname, fdwhandler::regproc, fdwvalidator::regproc, fdwoptions FROM pg_foreign_data_wrapper WHERE oid >= 16384 ORDER BY 1, 2, 3;
SELECT srvname, srvoptions FROM pg_foreign_server;
SELECT * FROM pg_user_mapping;
@@ -180,6 +180,41 @@ CREATE SERVER t2 FOREIGN DATA WRAPPER foo;
RESET ROLE;
REVOKE regress_test_indirect FROM regress_test_role;
+-- test SERVER ... FOR CONNECTION ONLY
+
+SET ROLE regress_test_role;
+CREATE SERVER t3 FOREIGN DATA WRAPPER pg_connection_fdw; -- ERROR: not a member of pg_create_connection
+RESET ROLE;
+GRANT USAGE ON FOREIGN DATA WRAPPER pg_connection_fdw TO regress_test_role;
+SET ROLE regress_test_role;
+
+CREATE SERVER t3 FOREIGN DATA WRAPPER pg_connection_fdw OPTIONS (client_encoding 'foo'); --fails
+CREATE SERVER t3 FOREIGN DATA WRAPPER pg_connection_fdw OPTIONS (user 'foo'); --fails
+CREATE SERVER t3 FOREIGN DATA WRAPPER pg_connection_fdw OPTIONS (password 'foo'); --fails
+CREATE SERVER t3 FOREIGN DATA WRAPPER pg_connection_fdw OPTIONS (password_required 'true'); --fails
+CREATE SERVER t3 FOREIGN DATA WRAPPER pg_connection_fdw;
+
+IMPORT FOREIGN SCHEMA foo FROM SERVER t3 INTO bar; -- fails
+
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x'); -- fails
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', password_required 'false'); -- fails
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', application_name 'nonsense'); -- fails
+
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', password 'secret');
+DROP USER MAPPING FOR PUBLIC SERVER t3;
+
+RESET ROLE;
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x'); -- still fails
+
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', password 'secret');
+DROP USER MAPPING FOR PUBLIC SERVER t3;
+
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', password_required 'false');
+
+DROP USER MAPPING FOR PUBLIC SERVER t3;
+DROP SERVER t3;
+REVOKE USAGE ON FOREIGN DATA WRAPPER pg_connection_fdw FROM regress_test_role;
+
-- ALTER SERVER
ALTER SERVER s0; -- ERROR
ALTER SERVER s0 OPTIONS (a '1'); -- ERROR
@@ -453,7 +488,7 @@ ALTER FOREIGN TABLE IF EXISTS doesnt_exist_ft1 RENAME TO foreign_table_1;
-- Information schema
-SELECT * FROM information_schema.foreign_data_wrappers ORDER BY 1, 2;
+SELECT foreign_data_wrapper_catalog,foreign_data_wrapper_name FROM information_schema.foreign_data_wrappers ORDER BY 1, 2;
SELECT * FROM information_schema.foreign_data_wrapper_options ORDER BY 1, 2, 3;
SELECT * FROM information_schema.foreign_servers ORDER BY 1, 2;
SELECT * FROM information_schema.foreign_server_options ORDER BY 1, 2, 3;
@@ -861,6 +896,6 @@ DROP FOREIGN DATA WRAPPER dummy CASCADE;
DROP ROLE regress_foreign_data_user;
-- At this point we should have no wrappers, no servers, and no mappings.
-SELECT fdwname, fdwhandler, fdwvalidator, fdwoptions FROM pg_foreign_data_wrapper;
+SELECT fdwname, fdwhandler, fdwvalidator, fdwoptions FROM pg_foreign_data_wrapper WHERE oid >= 16384;
SELECT srvname, srvoptions FROM pg_foreign_server;
SELECT * FROM pg_user_mapping;
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 444e563ff3..bef6bc8074 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -88,6 +88,45 @@ CREATE SUBSCRIPTION regress_testsub5 CONNECTION 'port=-1' PUBLICATION testpub;
-- fail - invalid connection string during ALTER
ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
+-- temporarily revoke pg_create_connection from pg_create_subscription
+-- to test that CREATE SUBSCRIPTION ... CONNECTION fails
+RESET SESSION AUTHORIZATION;
+REVOKE pg_create_connection FROM pg_create_subscription;
+GRANT CREATE ON DATABASE REGRESSION TO regress_subscription_user3;
+SET SESSION AUTHORIZATION regress_subscription_user3;
+
+-- fail - not a member of pg_create_connection, cannot use CONNECTION
+CREATE SUBSCRIPTION regress_testsub6 CONNECTION 'dbname=regress_doesnotexist password=regress_fakepassword' PUBLICATION testpub WITH (slot_name = NONE, connect = false);
+
+-- re-grant pg_create_connection to pg_create_subscription
+RESET SESSION AUTHORIZATION;
+GRANT pg_create_connection TO pg_create_subscription;
+SET SESSION AUTHORIZATION regress_subscription_user3;
+
+CREATE SUBSCRIPTION regress_testsub6 CONNECTION 'dbname=regress_doesnotexist password=regress_fakepassword' PUBLICATION testpub WITH (slot_name = NONE, connect = false);
+DROP SUBSCRIPTION regress_testsub6;
+
+-- test using a server object instead of connection string
+
+RESET SESSION AUTHORIZATION;
+CREATE SERVER regress_testserver FOREIGN DATA WRAPPER pg_connection_fdw;
+CREATE USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver
+ OPTIONS (password_required 'false');
+GRANT USAGE ON FOREIGN SERVER regress_testserver TO regress_subscription_user3;
+
+SET SESSION AUTHORIZATION regress_subscription_user3;
+CREATE SUBSCRIPTION regress_testsub6 SERVER regress_testserver PUBLICATION testpub
+ WITH (slot_name = NONE, connect = false);
+RESET SESSION AUTHORIZATION;
+
+ALTER SUBSCRIPTION regress_testsub6 OWNER TO regress_subscription_user; -- fail, no user mapping
+DROP SUBSCRIPTION regress_testsub6;
+
+DROP USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver;
+DROP SERVER regress_testserver;
+REVOKE CREATE ON DATABASE regression FROM regress_subscription_user3;
+SET SESSION AUTHORIZATION regress_subscription_user;
+
\dRs+
ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index 16c7fb94eb..d12c182f16 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -27,6 +27,8 @@ $node_publisher->safe_psql('postgres',
"CREATE TABLE tab_notrep AS SELECT generate_series(1,10) AS a");
$node_publisher->safe_psql('postgres',
"CREATE TABLE tab_ins AS SELECT generate_series(1,1002) AS a");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab_ins2 AS SELECT generate_series(1,1002) AS a");
$node_publisher->safe_psql('postgres',
"CREATE TABLE tab_full AS SELECT generate_series(1,10) AS a");
$node_publisher->safe_psql('postgres', "CREATE TABLE tab_full2 (x text)");
@@ -65,6 +67,7 @@ $node_publisher->safe_psql('postgres',
# Setup structure on subscriber
$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_notrep (a int)");
$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_ins (a int)");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_ins2 (a int)");
$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_full (a int)");
$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_full2 (x text)");
$node_subscriber->safe_psql('postgres',
@@ -110,6 +113,22 @@ $node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub, tap_pub_ins_only"
);
+my $publisher_host = $node_publisher->host;
+my $publisher_port = $node_publisher->port;
+$node_subscriber->safe_psql('postgres',
+ "CREATE SERVER tap_sub2_server FOREIGN DATA WRAPPER pg_connection_fdw OPTIONS (host '$publisher_host', port '$publisher_port', dbname 'postgres')"
+);
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE USER MAPPING FOR PUBLIC SERVER tap_sub2_server OPTIONS (password_required 'false')"
+);
+
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_simple_pub FOR TABLE tab_ins2");
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION tap_sub2 SERVER tap_sub2_server PUBLICATION tap_simple_pub"
+);
+
# Wait for initial table sync to finish
$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
@@ -121,11 +140,22 @@ $result =
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_ins");
is($result, qq(1002), 'check initial data was copied to subscriber');
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_ins2");
+is($result, qq(1002), 'check initial data was copied to subscriber');
+
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub2 CONNECTION '$publisher_connstr'");
+
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_ins SELECT generate_series(1,50)");
$node_publisher->safe_psql('postgres', "DELETE FROM tab_ins WHERE a > 20");
$node_publisher->safe_psql('postgres', "UPDATE tab_ins SET a = -a");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab_ins2 SELECT generate_series(1,50)");
+
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub2 SERVER tap_sub2_server");
+
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_rep SELECT generate_series(1,50)");
$node_publisher->safe_psql('postgres', "DELETE FROM tab_rep WHERE a > 20");
@@ -158,6 +188,10 @@ $result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), min(a), max(a) FROM tab_ins");
is($result, qq(1052|1|1002), 'check replicated inserts on subscriber');
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab_ins2");
+is($result, qq(1052|1|1002), 'check replicated inserts on subscriber');
+
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), min(a), max(a) FROM tab_rep");
is($result, qq(20|-20|-1), 'check replicated changes on subscriber');
@@ -449,10 +483,27 @@ $node_publisher->poll_query_until('postgres',
or die
"Timed out while waiting for apply to restart after changing PUBLICATION";
+# test that changes to a foreign server subscription cause the worker
+# to restart
+$oldpid = $node_publisher->safe_psql('postgres',
+ "SELECT pid FROM pg_stat_replication WHERE application_name = 'tap_sub2' AND state = 'streaming';"
+);
+$node_subscriber->safe_psql('postgres',
+ "ALTER SERVER tap_sub2_server OPTIONS (sslmode 'disable')"
+);
+$node_publisher->poll_query_until('postgres',
+ "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = 'tap_sub2' AND state = 'streaming';"
+ )
+ or die
+ "Timed out while waiting for apply to restart after changing PUBLICATION";
+
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_ins SELECT generate_series(1001,1100)");
$node_publisher->safe_psql('postgres', "DELETE FROM tab_rep");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab_ins2 SELECT generate_series(1001,1100)");
+
# Restart the publisher and check the state of the subscriber which
# should be in a streaming state after catching up.
$node_publisher->stop('fast');
@@ -465,6 +516,11 @@ $result = $node_subscriber->safe_psql('postgres',
is($result, qq(1152|1|1100),
'check replicated inserts after subscription publication change');
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab_ins2");
+is($result, qq(1152|1|1100),
+ 'check replicated inserts after subscription publication change');
+
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), min(a), max(a) FROM tab_rep");
is($result, qq(20|-20|-1),
@@ -533,6 +589,7 @@ $node_publisher->poll_query_until('postgres',
# check all the cleanup
$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_renamed");
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub2");
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_subscription");
--
2.34.1