On Tue, 2023-09-05 at 12:08 -0700, Jeff Davis wrote: > OK, so we could have a built-in FDW called pg_connection that would > do > the right kinds of validation; and then also allow other FDWs but the > subscription would have to do its own validation.
Attached a rough rebased version implementing the above with a pg_connection_fdw foreign data wrapper built in. Regards, Jeff Davis
From 776cd8e5e1541c56b1767aa595fc609fdeffa5e3 Mon Sep 17 00:00:00 2001 From: Jeff Davis <j...@j-davis.com> Date: Wed, 23 Aug 2023 10:31:16 -0700 Subject: [PATCH v3] CREATE SUBSCRIPTION ... SERVER. --- doc/src/sgml/ref/alter_subscription.sgml | 7 +- doc/src/sgml/ref/create_subscription.sgml | 15 +- doc/src/sgml/user-manag.sgml | 21 +- src/backend/catalog/Makefile | 1 + src/backend/catalog/pg_subscription.c | 17 +- src/backend/catalog/system_functions.sql | 2 + src/backend/commands/subscriptioncmds.c | 197 ++++++++++++++-- src/backend/foreign/foreign.c | 214 ++++++++++++++++++ src/backend/parser/gram.y | 20 ++ src/backend/replication/logical/worker.c | 12 +- src/bin/pg_dump/pg_dump.c | 59 ++++- src/bin/pg_dump/pg_dump.h | 1 + src/bin/psql/describe.c | 2 +- src/bin/psql/tab-complete.c | 2 +- src/include/catalog/meson.build | 1 + src/include/catalog/pg_authid.dat | 5 + .../catalog/pg_foreign_data_wrapper.dat | 22 ++ src/include/catalog/pg_proc.dat | 8 + src/include/catalog/pg_subscription.h | 5 +- src/include/foreign/foreign.h | 1 + src/include/nodes/parsenodes.h | 3 + src/test/regress/expected/foreign_data.out | 60 ++++- src/test/regress/expected/subscription.out | 38 ++++ src/test/regress/sql/foreign_data.sql | 41 +++- src/test/regress/sql/subscription.sql | 39 ++++ src/test/subscription/t/001_rep_changes.pl | 57 +++++ 26 files changed, 799 insertions(+), 51 deletions(-) create mode 100644 src/include/catalog/pg_foreign_data_wrapper.dat diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 6d36ff0dc9..f2235061bb 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -21,6 +21,7 @@ PostgreSQL documentation <refsynopsisdiv> <synopsis> +ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SERVER <replaceable>servername</replaceable> ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> CONNECTION '<replaceable>conninfo</replaceable>' ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SET PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ] ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> ADD PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ] @@ -98,9 +99,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO < <term><literal>CONNECTION '<replaceable class="parameter">conninfo</replaceable>'</literal></term> <listitem> <para> - This clause replaces the connection string originally set by - <xref linkend="sql-createsubscription"/>. See there for more - information. + This clause replaces the foreign server or connection string originally + set by <xref linkend="sql-createsubscription"/> with the connection + string <replaceable>conninfo</replaceable>. </para> </listitem> </varlistentry> diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index f1c20b3a46..cd76b2e32d 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -22,7 +22,7 @@ PostgreSQL documentation <refsynopsisdiv> <synopsis> CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceable> - CONNECTION '<replaceable class="parameter">conninfo</replaceable>' + { SERVER <replaceable class="parameter">servername</replaceable> | CONNECTION '<replaceable class="parameter">conninfo</replaceable>' } PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">subscription_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ] </synopsis> @@ -77,6 +77,15 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl </listitem> </varlistentry> + <varlistentry id="sql-createsubscription-params-server"> + <term><literal>SERVER <replaceable class="parameter">servername</replaceable></literal></term> + <listitem> + <para> + A foreign server to use for the connection. + </para> + </listitem> + </varlistentry> + <varlistentry id="sql-createsubscription-params-connection"> <term><literal>CONNECTION '<replaceable class="parameter">conninfo</replaceable>'</literal></term> <listitem> @@ -363,6 +372,10 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl The default is <literal>true</literal>. Only superusers can set this value to <literal>false</literal>. </para> + <para> + Only allowed when <literal>CONNECTION</literal> is + specified. Otherwise, see <xref linkend="sql-createusermapping"/>. + </para> </listitem> </varlistentry> diff --git a/doc/src/sgml/user-manag.sgml b/doc/src/sgml/user-manag.sgml index 92a299d2d3..d63a33a4b3 100644 --- a/doc/src/sgml/user-manag.sgml +++ b/doc/src/sgml/user-manag.sgml @@ -687,11 +687,20 @@ DROP ROLE doomed_role; <entry>Allow use of connection slots reserved via <xref linkend="guc-reserved-connections"/>.</entry> </row> + <row> + <entry>pg_create_connection</entry> + <entry>Allow users with <literal>CREATE</literal> permission on the + database to issue <link linkend="sql-createserver"><command>CREATE + SERVER</command></link> if <literal>FOR CONNECTION ONLY</literal> is + specified.</entry> + </row> <row> <entry>pg_create_subscription</entry> <entry>Allow users with <literal>CREATE</literal> permission on the - database to issue - <link linkend="sql-createsubscription"><command>CREATE SUBSCRIPTION</command></link>.</entry> + database to issue <link + linkend="sql-createsubscription"><command>CREATE + SUBSCRIPTION</command></link>. This role is a member of + <literal>pg_create_connection</literal>.</entry> </row> </tbody> </tgroup> @@ -737,6 +746,14 @@ DROP ROLE doomed_role; great care should be taken when granting these roles to users. </para> + <para> + The <literal>pg_create_subscription</literal> role is a member of + <literal>pg_create_connection</literal>. It may be useful to revoke that + membership in order to permit roles to create subscriptions only to a + foreign server, without allowing them to specify a connection string + directly. + </para> + <para> Care should be taken when granting these roles to ensure they are only used where needed and with the understanding that these roles grant access to privileged diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile index ec7b6f5362..365c956dea 100644 --- a/src/backend/catalog/Makefile +++ b/src/backend/catalog/Makefile @@ -136,6 +136,7 @@ POSTGRES_BKI_DATA = $(addprefix $(top_srcdir)/src/include/catalog/,\ pg_collation.dat \ pg_conversion.dat \ pg_database.dat \ + pg_foreign_data_wrapper.dat \ pg_language.dat \ pg_namespace.dat \ pg_opclass.dat \ diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index d6a978f136..f5c4ec8d99 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -23,6 +23,7 @@ #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "catalog/pg_type.h" +#include "foreign/foreign.h" #include "miscadmin.h" #include "nodes/makefuncs.h" #include "storage/lmgr.h" @@ -75,10 +76,18 @@ GetSubscription(Oid subid, bool missing_ok) sub->runasowner = subform->subrunasowner; /* Get conninfo */ - datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, - tup, - Anum_pg_subscription_subconninfo); - sub->conninfo = TextDatumGetCString(datum); + if (OidIsValid(subform->subserver)) + { + sub->conninfo = ForeignServerConnectionString(subform->subowner, + subform->subserver); + } + else + { + datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, + tup, + Anum_pg_subscription_subconninfo); + sub->conninfo = TextDatumGetCString(datum); + } /* Get slotname */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql index 4206752881..e1abda41ba 100644 --- a/src/backend/catalog/system_functions.sql +++ b/src/backend/catalog/system_functions.sql @@ -781,3 +781,5 @@ GRANT pg_read_all_settings TO pg_monitor; GRANT pg_read_all_stats TO pg_monitor; GRANT pg_stat_scan_tables TO pg_monitor; + +GRANT pg_create_connection TO pg_create_subscription; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index edc82c11be..c46900182e 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -25,14 +25,17 @@ #include "catalog/objectaddress.h" #include "catalog/pg_authid_d.h" #include "catalog/pg_database_d.h" +#include "catalog/pg_foreign_server.h" #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "catalog/pg_type.h" +#include "catalog/pg_user_mapping.h" #include "commands/dbcommands.h" #include "commands/defrem.h" #include "commands/event_trigger.h" #include "commands/subscriptioncmds.h" #include "executor/executor.h" +#include "foreign/foreign.h" #include "miscadmin.h" #include "nodes/makefuncs.h" #include "pgstat.h" @@ -574,6 +577,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, Datum values[Natts_pg_subscription]; Oid owner = GetUserId(); HeapTuple tup; + Oid serverid; + Oid umid; char *conninfo; char originname[NAMEDATALEN]; List *publications; @@ -594,6 +599,12 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); + if (IsSet(opts.specified_opts, SUBOPT_PASSWORD_REQUIRED) && stmt->servername) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("option \"password_required\" invalid on subscriptions to a foreign server"), + errhint("Use the \"password_required\" option on the user mappings associated with the foreign server."))); + /* * Since creating a replication slot is not transactional, rolling back * the transaction leaves the created replication slot. So we cannot run @@ -604,9 +615,9 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)"); /* - * We don't want to allow unprivileged users to be able to trigger - * attempts to access arbitrary network destinations, so require the user - * to have been specifically authorized to create subscriptions. + * We don't want to allow unprivileged users to utilize the resources that + * a subscription requires (such as a background worker), so require the + * user to have been specifically authorized to create subscriptions. */ if (!has_privs_of_role(owner, ROLE_PG_CREATE_SUBSCRIPTION)) ereport(ERROR, @@ -666,14 +677,54 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, if (opts.synchronous_commit == NULL) opts.synchronous_commit = "off"; - conninfo = stmt->conninfo; - publications = stmt->publication; - /* Load the library providing us libpq calls. */ load_file("libpqwalreceiver", false); - /* Check the connection info string. */ - walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser()); + if (stmt->servername) + { + ForeignServer *server; + UserMapping *um; + + Assert(!stmt->conninfo); + conninfo = NULL; + + server = GetForeignServerByName(stmt->servername, false); + aclresult = object_aclcheck(ForeignServerRelationId, server->serverid, owner, ACL_USAGE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, server->servername); + + um = GetUserMapping(owner, server->serverid); + + serverid = server->serverid; + umid = um->umid; + conninfo = ForeignServerConnectionString(owner, serverid); + } + else + { + Assert(stmt->conninfo); + + /* + * We don't want to allow unprivileged users to be able to trigger + * attempts to access arbitrary network destinations, so require the user + * to have been specifically authorized to create connections. + */ + if (!has_privs_of_role(owner, ROLE_PG_CREATE_CONNECTION)) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied to create subscription with a connection string"), + errdetail("Only roles with privileges of the \"%s\" role may create subscriptions with CONNECTION specified.", + "pg_create_connection"), + errhint("Create a subscription to a foreign server by specifying SERVER instead."))); + + /* Check the connection info string. */ + walrcv_check_conninfo(stmt->conninfo, opts.passwordrequired && !superuser()); + + serverid = InvalidOid; + umid = InvalidOid; + conninfo = stmt->conninfo; + } + + publications = stmt->publication; /* Everything ok, form a new tuple. */ memset(values, 0, sizeof(values)); @@ -697,8 +748,12 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr); values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired); values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner); - values[Anum_pg_subscription_subconninfo - 1] = - CStringGetTextDatum(conninfo); + values[Anum_pg_subscription_subserver - 1] = serverid; + if (!OidIsValid(serverid)) + values[Anum_pg_subscription_subconninfo - 1] = + CStringGetTextDatum(conninfo); + else + nulls[Anum_pg_subscription_subconninfo - 1] = true; if (opts.slot_name) values[Anum_pg_subscription_subslotname - 1] = DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name)); @@ -719,6 +774,20 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, recordDependencyOnOwner(SubscriptionRelationId, subid, owner); + ObjectAddressSet(myself, SubscriptionRelationId, subid); + + if (stmt->servername) + { + ObjectAddress referenced; + Assert(OidIsValid(serverid) && OidIsValid(umid)); + + ObjectAddressSet(referenced, ForeignServerRelationId, serverid); + recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL); + + ObjectAddressSet(referenced, UserMappingRelationId, umid); + recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL); + } + ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname)); replorigin_create(originname); @@ -835,8 +904,6 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, if (opts.enabled) ApplyLauncherWakeupAtCommit(); - ObjectAddressSet(myself, SubscriptionRelationId, subid); - InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0); return myself; @@ -1124,6 +1191,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, memset(nulls, false, sizeof(nulls)); memset(replaces, false, sizeof(replaces)); + ObjectAddressSet(myself, SubscriptionRelationId, subid); + switch (stmt->kind) { case ALTER_SUBSCRIPTION_OPTIONS: @@ -1191,6 +1260,12 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, if (IsSet(opts.specified_opts, SUBOPT_PASSWORD_REQUIRED)) { + if (OidIsValid(form->subserver)) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("option \"password_required\" invalid on subscriptions to a foreign server"), + errhint("Use the \"password_required\" option on the user mappings associated with the foreign server."))); + /* Non-superuser may not disable password_required. */ if (!opts.passwordrequired && !superuser()) ereport(ERROR, @@ -1244,7 +1319,77 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, break; } + case ALTER_SUBSCRIPTION_SERVER: + { + ForeignServer *new_server; + UserMapping *new_um; + ObjectAddress referenced; + AclResult aclresult; + + /* + * Remove what was there before, either another foreign server + * or a connection string. + */ + if (form->subserver) + { + UserMapping *old_um = GetUserMapping(form->subowner, form->subserver); + + deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid, + DEPENDENCY_NORMAL, + ForeignServerRelationId, form->subserver); + deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid, + DEPENDENCY_NORMAL, + UserMappingRelationId, old_um->umid); + } + else + { + nulls[Anum_pg_subscription_subconninfo - 1] = true; + replaces[Anum_pg_subscription_subconninfo - 1] = true; + } + + /* + * Find the new server and user mapping. Check ACL of server + * based on current user ID, but find the user mapping based + * on the subscription owner. + */ + new_server = GetForeignServerByName(stmt->servername, false); + aclresult = object_aclcheck(ForeignServerRelationId, + new_server->serverid, GetUserId(), ACL_USAGE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, + new_server->servername); + + new_um = GetUserMapping(form->subowner, new_server->serverid); + + values[Anum_pg_subscription_subserver - 1] = new_server->serverid; + replaces[Anum_pg_subscription_subserver - 1] = true; + + ObjectAddressSet(referenced, ForeignServerRelationId, new_server->serverid); + recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL); + + ObjectAddressSet(referenced, UserMappingRelationId, new_um->umid); + recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL); + update_tuple = true; + } + break; + case ALTER_SUBSCRIPTION_CONNECTION: + /* remove reference to foreign server and dependencies, if present */ + if (form->subserver) + { + UserMapping *old_um = GetUserMapping(form->subowner, form->subserver); + + deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid, + DEPENDENCY_NORMAL, + ForeignServerRelationId, form->subserver); + deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid, + DEPENDENCY_NORMAL, + UserMappingRelationId, old_um->umid); + + values[Anum_pg_subscription_subserver - 1] = InvalidOid; + replaces[Anum_pg_subscription_subserver - 1] = true; + } + /* Load the library providing us libpq calls. */ load_file("libpqwalreceiver", false); /* Check the connection info string. */ @@ -1455,8 +1600,6 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, table_close(rel, RowExclusiveLock); - ObjectAddressSet(myself, SubscriptionRelationId, subid); - InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0); /* Wake up related replication workers to handle this change quickly. */ @@ -1541,9 +1684,17 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) subname = pstrdup(NameStr(*DatumGetName(datum))); /* Get conninfo */ - datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup, - Anum_pg_subscription_subconninfo); - conninfo = TextDatumGetCString(datum); + if (OidIsValid(form->subserver)) + { + conninfo = ForeignServerConnectionString(form->subowner, + form->subserver); + } + else + { + datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup, + Anum_pg_subscription_subconninfo); + conninfo = TextDatumGetCString(datum); + } /* Get slotname */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup, @@ -1644,6 +1795,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) } /* Clean up dependencies */ + deleteDependencyRecordsFor(SubscriptionRelationId, subid, false); deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0); /* Remove any associated relation synchronization states. */ @@ -1853,6 +2005,17 @@ AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId) aclcheck_error(aclresult, OBJECT_DATABASE, get_database_name(MyDatabaseId)); + if (form->subserver) + { + UserMapping *old_um = GetUserMapping(form->subowner, form->subserver); + UserMapping *new_um = GetUserMapping(newOwnerId, form->subserver); + + if (changeDependencyFor(SubscriptionRelationId, form->oid, + UserMappingRelationId, old_um->umid, new_um->umid) != 1) + elog(ERROR, "could not change user mapping dependency for subscription %u", + form->oid); + } + form->subowner = newOwnerId; CatalogTupleUpdate(rel, &tup->t_self, tup); diff --git a/src/backend/foreign/foreign.c b/src/backend/foreign/foreign.c index fc3edef2a8..5a800fc48f 100644 --- a/src/backend/foreign/foreign.c +++ b/src/backend/foreign/foreign.c @@ -18,11 +18,14 @@ #include "catalog/pg_foreign_server.h" #include "catalog/pg_foreign_table.h" #include "catalog/pg_user_mapping.h" +#include "commands/defrem.h" #include "foreign/fdwapi.h" #include "foreign/foreign.h" #include "funcapi.h" #include "lib/stringinfo.h" +#include "mb/pg_wchar.h" #include "miscadmin.h" +#include "replication/walreceiver.h" #include "utils/builtins.h" #include "utils/memutils.h" #include "utils/rel.h" @@ -190,6 +193,94 @@ GetForeignServerByName(const char *srvname, bool missing_ok) } +/* + * Escape a connection option value. Helper for options_to_connstr(). + */ +static char * +escape_value(char *val) +{ + StringInfoData result; + + initStringInfo(&result); + + for (int i = 0; val[i] != '\0'; i++) + { + if (val[i] == '\\' || val[i] == '\'') + appendStringInfoChar(&result, '\\'); + appendStringInfoChar(&result, val[i]); + } + + return result.data; +} + + +/* + * Helper for ForeignServerConnectionString() and pg_connection_validator(). + * + * Transform a List of DefElem into a connection string. + * + * XXX: might leak memory, investigate + */ +static char * +options_to_connstr(List *options) +{ + StringInfoData connstr; + ListCell *lc; + bool first = true; + + initStringInfo(&connstr); + foreach(lc, options) + { + DefElem *d = (DefElem *) lfirst(lc); + char *name = d->defname; + char *value; + + /* not a libpq option; skip */ + if (strcmp(name, "password_required") == 0) + continue; + + /* XXX: pfree() result of defGetString() if needed? */ + value = escape_value(defGetString(d)); + + appendStringInfo(&connstr, "%s%s = '%s'", + first ? "" : " ", name, value); + first = false; + + pfree(value); + } + + /* override client_encoding */ + appendStringInfo(&connstr, "%sclient_encoding = '%s'", + first ? "" : " ", GetDatabaseEncodingName()); + + return connstr.data; +} + + +/* + * Given a user ID and server ID, return a postgres connection string suitable + * to pass to libpq. + * + * XXX: might leak memory, investigate + */ +char * +ForeignServerConnectionString(Oid userid, Oid serverid) +{ + ForeignServer *server = GetForeignServer(serverid); + UserMapping *um = GetUserMapping(userid, serverid); + List *options = list_concat(um->options, server->options); + char *connstr; + + connstr = options_to_connstr(options); + + pfree(server); + pfree(um); + list_free(options); + + return connstr; +} + + /* * GetUserMapping - look up the user mapping. * @@ -599,6 +690,129 @@ is_conninfo_option(const char *option, Oid context) return false; } +/* + * pg_connection_handler + * + * pg_connection_fdw is not used for foreign tables, so the handler should + * never be called. + */ +Datum +pg_connection_handler(PG_FUNCTION_ARGS) +{ + elog(ERROR, "pg_connection_handler unexpectedly called"); +} + +/* + * Option validator for CREATE SERVER ... FOR CONNECTION ONLY. + * + * XXX: try to unify with validators for CREATE SUBSCRIPTION ... CONNECTION, + * postgres_fdw, and dblink. Also investigate if memory leaks are a problem + * here. + */ +Datum +pg_connection_validator(PG_FUNCTION_ARGS) +{ + List *options_list = untransformRelOptions(PG_GETARG_DATUM(0)); + Oid catalog = PG_GETARG_OID(1); + + if (catalog == ForeignServerRelationId) + { + char *conninfo; + ListCell *lc; + + foreach(lc, options_list) + { + DefElem *d = (DefElem *) lfirst(lc); + + if (strcmp(d->defname, "client_encoding") == 0) + ereport(ERROR, + (errmsg("cannot specify client_encoding for pg_connection_fdw"))); + + if (strcmp(d->defname, "user") == 0 || + strcmp(d->defname, "password") == 0 || + strcmp(d->defname, "sslpassword") == 0 || + strcmp(d->defname, "password_required") == 0) + ereport(ERROR, + (errmsg("invalid option \"%s\" for pg_connection_fdw", + d->defname), + errhint("Specify option \"%s\" for a user mapping associated with the server instead.", + d->defname))); + } + + conninfo = options_to_connstr(options_list); + + /* Load the library providing us libpq calls. */ + load_file("libpqwalreceiver", false); + + walrcv_check_conninfo(conninfo, false); + } + else if (catalog == UserMappingRelationId) + { + bool password_required = true; + bool password_provided = false; + ListCell *lc; + + foreach(lc, options_list) + { + DefElem *d = (DefElem *) lfirst(lc); + + if (strcmp(d->defname, "password_required") == 0) + { + /* + * Only the superuser may set this option on a user mapping, or + * alter a user mapping on which this option is set. We allow a + * user to clear this option if it's set - in fact, we don't have + * a choice since we can't see the old mapping when validating an + * alter. + */ + if (!superuser() && !defGetBoolean(d)) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("password_required=false is superuser-only for pg_connection_fdw"), + errhint("User mappings with the password_required option set to false may only be created or modified by the superuser."))); + + password_required = defGetBoolean(d); + } + + if ((strcmp(d->defname, "sslkey") == 0 || strcmp(d->defname, "sslcert") == 0) && !superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("sslcert and sslkey are superuser-only for pg_connection_fdw"), + errhint("User mappings with the sslcert or sslkey options set may only be created or modified by the superuser."))); + + if (strcmp(d->defname, "password") == 0) + password_provided = true; + + if (strcmp(d->defname, "user") != 0 && + strcmp(d->defname, "password") != 0 && + strcmp(d->defname, "sslpassword") != 0 && + strcmp(d->defname, "sslkey") != 0 && + strcmp(d->defname, "sslcert") != 0 && + strcmp(d->defname, "password_required") != 0) + ereport(ERROR, + (errmsg("invalid user mapping option \"%s\" for pg_connection_fdw", + d->defname))); + } + + if (password_required && !password_provided) + ereport(ERROR, + (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED), + errmsg("password is required"), + errdetail("Non-superusers must provide a password in the connection string."))); + } + else if (catalog == ForeignTableRelationId) + ereport(ERROR, + (errmsg("cannot create foreign table using a pg_connection_fdw server"), + errhint("Use the server with CREATE SUBSCRIPTION instead."))); + else if (catalog == AttributeRelationId) + elog(ERROR, "unexpected call to pg_connection_validator for pg_attribute catalog"); + else + elog(ERROR, "unexpected call to pg_connection_validator for catalog %d", catalog); + + + PG_RETURN_BOOL(true); +} + /* * Validate the generic option given to SERVER or USER MAPPING. diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 63f172e175..259b5ca42b 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -10638,6 +10638,16 @@ CreateSubscriptionStmt: n->options = $8; $$ = (Node *) n; } + | CREATE SUBSCRIPTION name SERVER name PUBLICATION name_list opt_definition + { + CreateSubscriptionStmt *n = + makeNode(CreateSubscriptionStmt); + n->subname = $3; + n->servername = $5; + n->publication = $7; + n->options = $8; + $$ = (Node *) n; + } ; /***************************************************************************** @@ -10667,6 +10677,16 @@ AlterSubscriptionStmt: n->conninfo = $5; $$ = (Node *) n; } + | ALTER SUBSCRIPTION name SERVER name + { + AlterSubscriptionStmt *n = + makeNode(AlterSubscriptionStmt); + + n->kind = ALTER_SUBSCRIPTION_SERVER; + n->subname = $3; + n->servername = $5; + $$ = (Node *) n; + } | ALTER SUBSCRIPTION name REFRESH PUBLICATION opt_definition { AlterSubscriptionStmt *n = diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 21abf34ef7..be63ab5a5d 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -4003,7 +4003,9 @@ maybe_reread_subscription(void) } /* - * Callback from subscription syscache invalidation. + * Callback from subscription syscache invalidation. Also needed for server or + * user mapping invalidation, which can change the connection information for + * subscriptions that connect using a server object. */ static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue) @@ -4639,6 +4641,14 @@ InitializeLogRepWorker(void) CacheRegisterSyscacheCallback(SUBSCRIPTIONOID, subscription_change_cb, (Datum) 0); + /* Keep us informed about subscription changes. */ + CacheRegisterSyscacheCallback(FOREIGNSERVEROID, + subscription_change_cb, + (Datum) 0); + /* Keep us informed about subscription changes. */ + CacheRegisterSyscacheCallback(USERMAPPINGOID, + subscription_change_cb, + (Datum) 0); CacheRegisterSyscacheCallback(AUTHOID, subscription_change_cb, diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 050a831226..d603feec6e 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -2066,6 +2066,27 @@ selectDumpableStatisticsObject(StatsExtInfo *sobj, Archive *fout) sobj->dobj.dump = DUMP_COMPONENT_NONE; } +/* + * selectDumpableAccessMethod: policy-setting subroutine + * Mark an access method as to be dumped or not + * + * Access methods do not belong to any particular namespace. To identify + * built-in access methods, we must resort to checking whether the + * method's OID is in the range reserved for initdb. + */ +static void +selectDumpableFdw(FdwInfo *fdwinfo, Archive *fout) +{ + if (checkExtensionMembership(&fdwinfo->dobj, fout)) + return; /* extension membership overrides all else */ + + if (fdwinfo->dobj.catId.oid <= (Oid) g_last_builtin_oid) + fdwinfo->dobj.dump = DUMP_COMPONENT_NONE; + else + fdwinfo->dobj.dump = fout->dopt->include_everything ? + DUMP_COMPONENT_ALL : DUMP_COMPONENT_NONE; +} + /* * selectDumpableObject: policy-setting subroutine * Mark a generic dumpable object as to be dumped or not @@ -4633,6 +4654,7 @@ getSubscriptions(Archive *fout) int i_subdisableonerr; int i_subpasswordrequired; int i_subrunasowner; + int i_subservername; int i_subconninfo; int i_subslotname; int i_subsynccommit; @@ -4701,10 +4723,19 @@ getSubscriptions(Archive *fout) " '%s' AS suborigin\n", LOGICALREP_ORIGIN_ANY); - appendPQExpBufferStr(query, - "FROM pg_subscription s\n" - "WHERE s.subdbid = (SELECT oid FROM pg_database\n" - " WHERE datname = current_database())"); + if (fout->remoteVersion >= 170000) + appendPQExpBufferStr(query, + " fs.srvname AS subservername\n" + "FROM pg_subscription s LEFT JOIN pg_foreign_server fs\n" + " ON (s.subserver = fs.oid)\n" + "WHERE s.subdbid = (SELECT oid FROM pg_database\n" + " WHERE datname = current_database())"); + else + appendPQExpBufferStr(query, + " NULL AS subservername\n" + "FROM pg_subscription s\n" + "WHERE s.subdbid = (SELECT oid FROM pg_database\n" + " WHERE datname = current_database())"); res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK); @@ -4724,6 +4755,7 @@ getSubscriptions(Archive *fout) i_subdisableonerr = PQfnumber(res, "subdisableonerr"); i_subpasswordrequired = PQfnumber(res, "subpasswordrequired"); i_subrunasowner = PQfnumber(res, "subrunasowner"); + i_subservername = PQfnumber(res, "subservername"); i_subconninfo = PQfnumber(res, "subconninfo"); i_subslotname = PQfnumber(res, "subslotname"); i_subsynccommit = PQfnumber(res, "subsynccommit"); @@ -4741,7 +4773,10 @@ getSubscriptions(Archive *fout) AssignDumpId(&subinfo[i].dobj); subinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_subname)); subinfo[i].rolname = getRoleName(PQgetvalue(res, i, i_subowner)); - + if (PQgetisnull(res, i, i_subservername)) + subinfo[i].subservername = NULL; + else + subinfo[i].subservername = pg_strdup(PQgetvalue(res, i, i_subservername)); subinfo[i].subbinary = pg_strdup(PQgetvalue(res, i, i_subbinary)); subinfo[i].substream = @@ -4804,9 +4839,17 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) appendPQExpBuffer(delq, "DROP SUBSCRIPTION %s;\n", qsubname); - appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s CONNECTION ", + appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s ", qsubname); - appendStringLiteralAH(query, subinfo->subconninfo, fout); + if (subinfo->subservername) + { + appendPQExpBuffer(query, "SERVER %s", fmtId(subinfo->subservername)); + } + else + { + appendPQExpBuffer(query, "CONNECTION "); + appendStringLiteralAH(query, subinfo->subconninfo, fout); + } /* Build list of quoted publications and append them to query. */ if (!parsePGArray(subinfo->subpublications, &pubnames, &npubnames)) @@ -9513,7 +9556,7 @@ getForeignDataWrappers(Archive *fout, int *numForeignDataWrappers) fdwinfo[i].fdwoptions = pg_strdup(PQgetvalue(res, i, i_fdwoptions)); /* Decide whether we want to dump it */ - selectDumpableObject(&(fdwinfo[i].dobj), fout); + selectDumpableFdw(&fdwinfo[i], fout); /* Mark whether FDW has an ACL */ if (!PQgetisnull(res, i, i_fdwacl)) diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 673ca5c92d..0a94b9b7ae 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -667,6 +667,7 @@ typedef struct _SubscriptionInfo char *subdisableonerr; char *subpasswordrequired; char *subrunasowner; + char *subservername; char *subconninfo; char *subslotname; char *subsynccommit; diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 5077e7b358..fb4ea96968 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -5793,7 +5793,7 @@ listForeignDataWrappers(const char *pattern, bool verbose) gettext_noop("Description")); } - appendPQExpBufferStr(&buf, "\nFROM pg_catalog.pg_foreign_data_wrapper fdw\n"); + appendPQExpBufferStr(&buf, "\nFROM (SELECT tableoid, * FROM pg_catalog.pg_foreign_data_wrapper WHERE oid>=16384) fdw\n"); if (verbose) appendPQExpBufferStr(&buf, diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index 049801186c..230683a850 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -3314,7 +3314,7 @@ psql_completion(const char *text, int start, int end) /* CREATE SUBSCRIPTION */ else if (Matches("CREATE", "SUBSCRIPTION", MatchAny)) - COMPLETE_WITH("CONNECTION"); + COMPLETE_WITH("SERVER", "CONNECTION"); else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "CONNECTION", MatchAny)) COMPLETE_WITH("PUBLICATION"); else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "CONNECTION", diff --git a/src/include/catalog/meson.build b/src/include/catalog/meson.build index dcb3c5f766..01e92089bf 100644 --- a/src/include/catalog/meson.build +++ b/src/include/catalog/meson.build @@ -83,6 +83,7 @@ bki_data = [ 'pg_collation.dat', 'pg_conversion.dat', 'pg_database.dat', + 'pg_foreign_data_wrapper.dat', 'pg_language.dat', 'pg_namespace.dat', 'pg_opclass.dat', diff --git a/src/include/catalog/pg_authid.dat b/src/include/catalog/pg_authid.dat index 6b4a0aaaad..8fce457ab1 100644 --- a/src/include/catalog/pg_authid.dat +++ b/src/include/catalog/pg_authid.dat @@ -94,5 +94,10 @@ rolcreaterole => 'f', rolcreatedb => 'f', rolcanlogin => 'f', rolreplication => 'f', rolbypassrls => 'f', rolconnlimit => '-1', rolpassword => '_null_', rolvaliduntil => '_null_' }, +{ oid => '6123', oid_symbol => 'ROLE_PG_CREATE_CONNECTION', + rolname => 'pg_create_connection', rolsuper => 'f', rolinherit => 't', + rolcreaterole => 'f', rolcreatedb => 'f', rolcanlogin => 'f', + rolreplication => 'f', rolbypassrls => 'f', rolconnlimit => '-1', + rolpassword => '_null_', rolvaliduntil => '_null_' }, ] diff --git a/src/include/catalog/pg_foreign_data_wrapper.dat b/src/include/catalog/pg_foreign_data_wrapper.dat new file mode 100644 index 0000000000..7d489bf849 --- /dev/null +++ b/src/include/catalog/pg_foreign_data_wrapper.dat @@ -0,0 +1,22 @@ +#---------------------------------------------------------------------- +# +# pg_foreign_data_wrapper.dat +# Initial contents of the pg_foreign_data_wrapper system catalog. +# +# Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group +# Portions Copyright (c) 1994, Regents of the University of California +# +# src/include/catalog/pg_foreign_data_wrapper.dat +# +#---------------------------------------------------------------------- + +[ + +{ oid => '6015', oid_symbol => 'PG_CONNECTION_FDW', + descr => 'Pseudo FDW for connections to Postgres', + fdwname => 'pg_connection_fdw', fdwowner => 'POSTGRES', + fdwhandler => 'pg_connection_handler', + fdwvalidator => 'pg_connection_validator', + fdwacl => '_null_', fdwoptions => '_null_'}, + +] diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 9052f5262a..8e81e5f6bb 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -7516,6 +7516,14 @@ proname => 'postgresql_fdw_validator', prorettype => 'bool', proargtypes => '_text oid', prosrc => 'postgresql_fdw_validator' }, +{ oid => '6122', descr => '(internal)', + proname => 'pg_connection_handler', prorettype => 'fdw_handler', + proargtypes => '', prosrc => 'pg_connection_handler' }, + +{ oid => '6124', descr => '(internal)', + proname => 'pg_connection_validator', prorettype => 'bool', + proargtypes => '_text oid', prosrc => 'pg_connection_validator' }, + { oid => '2290', descr => 'I/O', proname => 'record_in', provolatile => 's', prorettype => 'record', proargtypes => 'cstring oid int4', prosrc => 'record_in' }, diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index e0b91eacd2..4b83c8ca8b 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -93,9 +93,12 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool subrunasowner; /* True if replication should execute as the * subscription owner */ + Oid subserver; /* Set if connecting with server */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ - text subconninfo BKI_FORCE_NOT_NULL; + text subconninfo BKI_FORCE_NULL; /* Set if connecting with + connection string */ /* Slot name on publisher */ NameData subslotname BKI_FORCE_NULL; diff --git a/src/include/foreign/foreign.h b/src/include/foreign/foreign.h index 5256d4d91f..7058335d63 100644 --- a/src/include/foreign/foreign.h +++ b/src/include/foreign/foreign.h @@ -69,6 +69,7 @@ extern ForeignServer *GetForeignServerExtended(Oid serverid, bits16 flags); extern ForeignServer *GetForeignServerByName(const char *srvname, bool missing_ok); +extern char *ForeignServerConnectionString(Oid userid, Oid serverid); extern UserMapping *GetUserMapping(Oid userid, Oid serverid); extern ForeignDataWrapper *GetForeignDataWrapper(Oid fdwid); extern ForeignDataWrapper *GetForeignDataWrapperExtended(Oid fdwid, diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index e494309da8..4b316d4664 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -4040,6 +4040,7 @@ typedef struct CreateSubscriptionStmt { NodeTag type; char *subname; /* Name of the subscription */ + char *servername; /* Server name of publisher */ char *conninfo; /* Connection string to publisher */ List *publication; /* One or more publication to subscribe to */ List *options; /* List of DefElem nodes */ @@ -4048,6 +4049,7 @@ typedef struct CreateSubscriptionStmt typedef enum AlterSubscriptionType { ALTER_SUBSCRIPTION_OPTIONS, + ALTER_SUBSCRIPTION_SERVER, ALTER_SUBSCRIPTION_CONNECTION, ALTER_SUBSCRIPTION_SET_PUBLICATION, ALTER_SUBSCRIPTION_ADD_PUBLICATION, @@ -4062,6 +4064,7 @@ typedef struct AlterSubscriptionStmt NodeTag type; AlterSubscriptionType kind; /* ALTER_SUBSCRIPTION_OPTIONS, etc */ char *subname; /* Name of the subscription */ + char *servername; /* Server name of publisher */ char *conninfo; /* Connection string to publisher */ List *publication; /* One or more publication to subscribe to */ List *options; /* List of DefElem nodes */ diff --git a/src/test/regress/expected/foreign_data.out b/src/test/regress/expected/foreign_data.out index 1dfe23cc1e..b551405a3d 100644 --- a/src/test/regress/expected/foreign_data.out +++ b/src/test/regress/expected/foreign_data.out @@ -25,7 +25,7 @@ CREATE FOREIGN DATA WRAPPER dummy; COMMENT ON FOREIGN DATA WRAPPER dummy IS 'useless'; CREATE FOREIGN DATA WRAPPER postgresql VALIDATOR postgresql_fdw_validator; -- At this point we should have 2 built-in wrappers and no servers. -SELECT fdwname, fdwhandler::regproc, fdwvalidator::regproc, fdwoptions FROM pg_foreign_data_wrapper ORDER BY 1, 2, 3; +SELECT fdwname, fdwhandler::regproc, fdwvalidator::regproc, fdwoptions FROM pg_foreign_data_wrapper WHERE oid >= 16384 ORDER BY 1, 2, 3; fdwname | fdwhandler | fdwvalidator | fdwoptions ------------+------------+--------------------------+------------ dummy | - | - | @@ -394,6 +394,47 @@ CREATE SERVER t2 FOREIGN DATA WRAPPER foo; RESET ROLE; REVOKE regress_test_indirect FROM regress_test_role; +-- test SERVER ... FOR CONNECTION ONLY +SET ROLE regress_test_role; +CREATE SERVER t3 FOREIGN DATA WRAPPER pg_connection_fdw; -- ERROR: not a member of pg_create_connection +ERROR: permission denied for foreign-data wrapper pg_connection_fdw +RESET ROLE; +GRANT USAGE ON FOREIGN DATA WRAPPER pg_connection_fdw TO regress_test_role; +SET ROLE regress_test_role; +CREATE SERVER t3 FOREIGN DATA WRAPPER pg_connection_fdw OPTIONS (client_encoding 'foo'); --fails +ERROR: cannot specify client_encoding for pg_connection_fdw +CREATE SERVER t3 FOREIGN DATA WRAPPER pg_connection_fdw OPTIONS (user 'foo'); --fails +ERROR: invalid option "user" for pg_connection_fdw +HINT: Specify option "user" for a user mapping associated with the server instead. +CREATE SERVER t3 FOREIGN DATA WRAPPER pg_connection_fdw OPTIONS (password 'foo'); --fails +ERROR: invalid option "password" for pg_connection_fdw +HINT: Specify option "password" for a user mapping associated with the server instead. +CREATE SERVER t3 FOREIGN DATA WRAPPER pg_connection_fdw OPTIONS (password_required 'true'); --fails +ERROR: invalid option "password_required" for pg_connection_fdw +HINT: Specify option "password_required" for a user mapping associated with the server instead. +CREATE SERVER t3 FOREIGN DATA WRAPPER pg_connection_fdw; +IMPORT FOREIGN SCHEMA foo FROM SERVER t3 INTO bar; -- fails +ERROR: schema "bar" does not exist +CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x'); -- fails +ERROR: password is required +DETAIL: Non-superusers must provide a password in the connection string. +CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', password_required 'false'); -- fails +ERROR: password_required=false is superuser-only for pg_connection_fdw +HINT: User mappings with the password_required option set to false may only be created or modified by the superuser. +CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', application_name 'nonsense'); -- fails +ERROR: invalid user mapping option "application_name" for pg_connection_fdw +CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', password 'secret'); +DROP USER MAPPING FOR PUBLIC SERVER t3; +RESET ROLE; +CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x'); -- still fails +ERROR: password is required +DETAIL: Non-superusers must provide a password in the connection string. +CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', password 'secret'); +DROP USER MAPPING FOR PUBLIC SERVER t3; +CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', password_required 'false'); +DROP USER MAPPING FOR PUBLIC SERVER t3; +DROP SERVER t3; +REVOKE USAGE ON FOREIGN DATA WRAPPER pg_connection_fdw FROM regress_test_role; -- ALTER SERVER ALTER SERVER s0; -- ERROR ERROR: syntax error at or near ";" @@ -966,13 +1007,14 @@ NOTICE: relation "doesnt_exist_ft1" does not exist, skipping ALTER FOREIGN TABLE IF EXISTS doesnt_exist_ft1 RENAME TO foreign_table_1; NOTICE: relation "doesnt_exist_ft1" does not exist, skipping -- Information schema -SELECT * FROM information_schema.foreign_data_wrappers ORDER BY 1, 2; - foreign_data_wrapper_catalog | foreign_data_wrapper_name | authorization_identifier | library_name | foreign_data_wrapper_language -------------------------------+---------------------------+---------------------------+--------------+------------------------------- - regression | dummy | regress_foreign_data_user | | c - regression | foo | regress_foreign_data_user | | c - regression | postgresql | regress_foreign_data_user | | c -(3 rows) +SELECT foreign_data_wrapper_catalog,foreign_data_wrapper_name FROM information_schema.foreign_data_wrappers ORDER BY 1, 2; + foreign_data_wrapper_catalog | foreign_data_wrapper_name +------------------------------+--------------------------- + regression | dummy + regression | foo + regression | pg_connection_fdw + regression | postgresql +(4 rows) SELECT * FROM information_schema.foreign_data_wrapper_options ORDER BY 1, 2, 3; foreign_data_wrapper_catalog | foreign_data_wrapper_name | option_name | option_value @@ -2186,7 +2228,7 @@ NOTICE: drop cascades to server s0 \c DROP ROLE regress_foreign_data_user; -- At this point we should have no wrappers, no servers, and no mappings. -SELECT fdwname, fdwhandler, fdwvalidator, fdwoptions FROM pg_foreign_data_wrapper; +SELECT fdwname, fdwhandler, fdwvalidator, fdwoptions FROM pg_foreign_data_wrapper WHERE oid >= 16384; fdwname | fdwhandler | fdwvalidator | fdwoptions ---------+------------+--------------+------------ (0 rows) diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index b15eddbff3..738f1b1678 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -144,6 +144,44 @@ ERROR: could not connect to the publisher: invalid port number: "-1" ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string +-- temporarily revoke pg_create_connection from pg_create_subscription +-- to test that CREATE SUBSCRIPTION ... CONNECTION fails +RESET SESSION AUTHORIZATION; +REVOKE pg_create_connection FROM pg_create_subscription; +GRANT CREATE ON DATABASE REGRESSION TO regress_subscription_user3; +SET SESSION AUTHORIZATION regress_subscription_user3; +-- fail - not a member of pg_create_connection, cannot use CONNECTION +CREATE SUBSCRIPTION regress_testsub6 CONNECTION 'dbname=regress_doesnotexist password=regress_fakepassword' PUBLICATION testpub WITH (slot_name = NONE, connect = false); +ERROR: permission denied to create subscription with a connection string +DETAIL: Only roles with privileges of the "pg_create_connection" role may create subscriptions with CONNECTION specified. +HINT: Create a subscription to a foreign server by specifying SERVER instead. +-- re-grant pg_create_connection to pg_create_subscription +RESET SESSION AUTHORIZATION; +GRANT pg_create_connection TO pg_create_subscription; +SET SESSION AUTHORIZATION regress_subscription_user3; +CREATE SUBSCRIPTION regress_testsub6 CONNECTION 'dbname=regress_doesnotexist password=regress_fakepassword' PUBLICATION testpub WITH (slot_name = NONE, connect = false); +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. +DROP SUBSCRIPTION regress_testsub6; +-- test using a server object instead of connection string +RESET SESSION AUTHORIZATION; +CREATE SERVER regress_testserver FOREIGN DATA WRAPPER pg_connection_fdw; +CREATE USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver + OPTIONS (password_required 'false'); +GRANT USAGE ON FOREIGN SERVER regress_testserver TO regress_subscription_user3; +SET SESSION AUTHORIZATION regress_subscription_user3; +CREATE SUBSCRIPTION regress_testsub6 SERVER regress_testserver PUBLICATION testpub + WITH (slot_name = NONE, connect = false); +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. +RESET SESSION AUTHORIZATION; +ALTER SUBSCRIPTION regress_testsub6 OWNER TO regress_subscription_user; -- fail, no user mapping +ERROR: user mapping not found for user "regress_subscription_user", server "regress_testserver" +DROP SUBSCRIPTION regress_testsub6; +DROP USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver; +DROP SERVER regress_testserver; +REVOKE CREATE ON DATABASE regression FROM regress_subscription_user3; +SET SESSION AUTHORIZATION regress_subscription_user; \dRs+ List of subscriptions Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN diff --git a/src/test/regress/sql/foreign_data.sql b/src/test/regress/sql/foreign_data.sql index eefb860adc..15a28de9ac 100644 --- a/src/test/regress/sql/foreign_data.sql +++ b/src/test/regress/sql/foreign_data.sql @@ -36,7 +36,7 @@ COMMENT ON FOREIGN DATA WRAPPER dummy IS 'useless'; CREATE FOREIGN DATA WRAPPER postgresql VALIDATOR postgresql_fdw_validator; -- At this point we should have 2 built-in wrappers and no servers. -SELECT fdwname, fdwhandler::regproc, fdwvalidator::regproc, fdwoptions FROM pg_foreign_data_wrapper ORDER BY 1, 2, 3; +SELECT fdwname, fdwhandler::regproc, fdwvalidator::regproc, fdwoptions FROM pg_foreign_data_wrapper WHERE oid >= 16384 ORDER BY 1, 2, 3; SELECT srvname, srvoptions FROM pg_foreign_server; SELECT * FROM pg_user_mapping; @@ -180,6 +180,41 @@ CREATE SERVER t2 FOREIGN DATA WRAPPER foo; RESET ROLE; REVOKE regress_test_indirect FROM regress_test_role; +-- test SERVER ... FOR CONNECTION ONLY + +SET ROLE regress_test_role; +CREATE SERVER t3 FOREIGN DATA WRAPPER pg_connection_fdw; -- ERROR: not a member of pg_create_connection +RESET ROLE; +GRANT USAGE ON FOREIGN DATA WRAPPER pg_connection_fdw TO regress_test_role; +SET ROLE regress_test_role; + +CREATE SERVER t3 FOREIGN DATA WRAPPER pg_connection_fdw OPTIONS (client_encoding 'foo'); --fails +CREATE SERVER t3 FOREIGN DATA WRAPPER pg_connection_fdw OPTIONS (user 'foo'); --fails +CREATE SERVER t3 FOREIGN DATA WRAPPER pg_connection_fdw OPTIONS (password 'foo'); --fails +CREATE SERVER t3 FOREIGN DATA WRAPPER pg_connection_fdw OPTIONS (password_required 'true'); --fails +CREATE SERVER t3 FOREIGN DATA WRAPPER pg_connection_fdw; + +IMPORT FOREIGN SCHEMA foo FROM SERVER t3 INTO bar; -- fails + +CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x'); -- fails +CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', password_required 'false'); -- fails +CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', application_name 'nonsense'); -- fails + +CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', password 'secret'); +DROP USER MAPPING FOR PUBLIC SERVER t3; + +RESET ROLE; +CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x'); -- still fails + +CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', password 'secret'); +DROP USER MAPPING FOR PUBLIC SERVER t3; + +CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', password_required 'false'); + +DROP USER MAPPING FOR PUBLIC SERVER t3; +DROP SERVER t3; +REVOKE USAGE ON FOREIGN DATA WRAPPER pg_connection_fdw FROM regress_test_role; + -- ALTER SERVER ALTER SERVER s0; -- ERROR ALTER SERVER s0 OPTIONS (a '1'); -- ERROR @@ -453,7 +488,7 @@ ALTER FOREIGN TABLE IF EXISTS doesnt_exist_ft1 RENAME TO foreign_table_1; -- Information schema -SELECT * FROM information_schema.foreign_data_wrappers ORDER BY 1, 2; +SELECT foreign_data_wrapper_catalog,foreign_data_wrapper_name FROM information_schema.foreign_data_wrappers ORDER BY 1, 2; SELECT * FROM information_schema.foreign_data_wrapper_options ORDER BY 1, 2, 3; SELECT * FROM information_schema.foreign_servers ORDER BY 1, 2; SELECT * FROM information_schema.foreign_server_options ORDER BY 1, 2, 3; @@ -861,6 +896,6 @@ DROP FOREIGN DATA WRAPPER dummy CASCADE; DROP ROLE regress_foreign_data_user; -- At this point we should have no wrappers, no servers, and no mappings. -SELECT fdwname, fdwhandler, fdwvalidator, fdwoptions FROM pg_foreign_data_wrapper; +SELECT fdwname, fdwhandler, fdwvalidator, fdwoptions FROM pg_foreign_data_wrapper WHERE oid >= 16384; SELECT srvname, srvoptions FROM pg_foreign_server; SELECT * FROM pg_user_mapping; diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 444e563ff3..bef6bc8074 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -88,6 +88,45 @@ CREATE SUBSCRIPTION regress_testsub5 CONNECTION 'port=-1' PUBLICATION testpub; -- fail - invalid connection string during ALTER ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; +-- temporarily revoke pg_create_connection from pg_create_subscription +-- to test that CREATE SUBSCRIPTION ... CONNECTION fails +RESET SESSION AUTHORIZATION; +REVOKE pg_create_connection FROM pg_create_subscription; +GRANT CREATE ON DATABASE REGRESSION TO regress_subscription_user3; +SET SESSION AUTHORIZATION regress_subscription_user3; + +-- fail - not a member of pg_create_connection, cannot use CONNECTION +CREATE SUBSCRIPTION regress_testsub6 CONNECTION 'dbname=regress_doesnotexist password=regress_fakepassword' PUBLICATION testpub WITH (slot_name = NONE, connect = false); + +-- re-grant pg_create_connection to pg_create_subscription +RESET SESSION AUTHORIZATION; +GRANT pg_create_connection TO pg_create_subscription; +SET SESSION AUTHORIZATION regress_subscription_user3; + +CREATE SUBSCRIPTION regress_testsub6 CONNECTION 'dbname=regress_doesnotexist password=regress_fakepassword' PUBLICATION testpub WITH (slot_name = NONE, connect = false); +DROP SUBSCRIPTION regress_testsub6; + +-- test using a server object instead of connection string + +RESET SESSION AUTHORIZATION; +CREATE SERVER regress_testserver FOREIGN DATA WRAPPER pg_connection_fdw; +CREATE USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver + OPTIONS (password_required 'false'); +GRANT USAGE ON FOREIGN SERVER regress_testserver TO regress_subscription_user3; + +SET SESSION AUTHORIZATION regress_subscription_user3; +CREATE SUBSCRIPTION regress_testsub6 SERVER regress_testserver PUBLICATION testpub + WITH (slot_name = NONE, connect = false); +RESET SESSION AUTHORIZATION; + +ALTER SUBSCRIPTION regress_testsub6 OWNER TO regress_subscription_user; -- fail, no user mapping +DROP SUBSCRIPTION regress_testsub6; + +DROP USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver; +DROP SERVER regress_testserver; +REVOKE CREATE ON DATABASE regression FROM regress_subscription_user3; +SET SESSION AUTHORIZATION regress_subscription_user; + \dRs+ ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index 16c7fb94eb..d12c182f16 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -27,6 +27,8 @@ $node_publisher->safe_psql('postgres', "CREATE TABLE tab_notrep AS SELECT generate_series(1,10) AS a"); $node_publisher->safe_psql('postgres', "CREATE TABLE tab_ins AS SELECT generate_series(1,1002) AS a"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_ins2 AS SELECT generate_series(1,1002) AS a"); $node_publisher->safe_psql('postgres', "CREATE TABLE tab_full AS SELECT generate_series(1,10) AS a"); $node_publisher->safe_psql('postgres', "CREATE TABLE tab_full2 (x text)"); @@ -65,6 +67,7 @@ $node_publisher->safe_psql('postgres', # Setup structure on subscriber $node_subscriber->safe_psql('postgres', "CREATE TABLE tab_notrep (a int)"); $node_subscriber->safe_psql('postgres', "CREATE TABLE tab_ins (a int)"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_ins2 (a int)"); $node_subscriber->safe_psql('postgres', "CREATE TABLE tab_full (a int)"); $node_subscriber->safe_psql('postgres', "CREATE TABLE tab_full2 (x text)"); $node_subscriber->safe_psql('postgres', @@ -110,6 +113,22 @@ $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub, tap_pub_ins_only" ); +my $publisher_host = $node_publisher->host; +my $publisher_port = $node_publisher->port; +$node_subscriber->safe_psql('postgres', + "CREATE SERVER tap_sub2_server FOREIGN DATA WRAPPER pg_connection_fdw OPTIONS (host '$publisher_host', port '$publisher_port', dbname 'postgres')" +); + +$node_subscriber->safe_psql('postgres', + "CREATE USER MAPPING FOR PUBLIC SERVER tap_sub2_server OPTIONS (password_required 'false')" +); + +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_simple_pub FOR TABLE tab_ins2"); + +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub2 SERVER tap_sub2_server PUBLICATION tap_simple_pub" +); + # Wait for initial table sync to finish $node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub'); @@ -121,11 +140,22 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_ins"); is($result, qq(1002), 'check initial data was copied to subscriber'); +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_ins2"); +is($result, qq(1002), 'check initial data was copied to subscriber'); + +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub2 CONNECTION '$publisher_connstr'"); + $node_publisher->safe_psql('postgres', "INSERT INTO tab_ins SELECT generate_series(1,50)"); $node_publisher->safe_psql('postgres', "DELETE FROM tab_ins WHERE a > 20"); $node_publisher->safe_psql('postgres', "UPDATE tab_ins SET a = -a"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_ins2 SELECT generate_series(1,50)"); + +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub2 SERVER tap_sub2_server"); + $node_publisher->safe_psql('postgres', "INSERT INTO tab_rep SELECT generate_series(1,50)"); $node_publisher->safe_psql('postgres', "DELETE FROM tab_rep WHERE a > 20"); @@ -158,6 +188,10 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_ins"); is($result, qq(1052|1|1002), 'check replicated inserts on subscriber'); +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_ins2"); +is($result, qq(1052|1|1002), 'check replicated inserts on subscriber'); + $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_rep"); is($result, qq(20|-20|-1), 'check replicated changes on subscriber'); @@ -449,10 +483,27 @@ $node_publisher->poll_query_until('postgres', or die "Timed out while waiting for apply to restart after changing PUBLICATION"; +# test that changes to a foreign server subscription cause the worker +# to restart +$oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = 'tap_sub2' AND state = 'streaming';" +); +$node_subscriber->safe_psql('postgres', + "ALTER SERVER tap_sub2_server OPTIONS (sslmode 'disable')" +); +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = 'tap_sub2' AND state = 'streaming';" + ) + or die + "Timed out while waiting for apply to restart after changing PUBLICATION"; + $node_publisher->safe_psql('postgres', "INSERT INTO tab_ins SELECT generate_series(1001,1100)"); $node_publisher->safe_psql('postgres', "DELETE FROM tab_rep"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_ins2 SELECT generate_series(1001,1100)"); + # Restart the publisher and check the state of the subscriber which # should be in a streaming state after catching up. $node_publisher->stop('fast'); @@ -465,6 +516,11 @@ $result = $node_subscriber->safe_psql('postgres', is($result, qq(1152|1|1100), 'check replicated inserts after subscription publication change'); +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_ins2"); +is($result, qq(1152|1|1100), + 'check replicated inserts after subscription publication change'); + $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_rep"); is($result, qq(20|-20|-1), @@ -533,6 +589,7 @@ $node_publisher->poll_query_until('postgres', # check all the cleanup $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_renamed"); +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub2"); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_subscription"); -- 2.34.1