On Fri, 2024-03-08 at 00:20 -0800, Jeff Davis wrote:
> Implemented in v11, attached.
Rebased, v12 attached.
Regards,
Jeff Davis
From 5c2a8f5cb865becd70b08379d9fc72946be9a32a Mon Sep 17 00:00:00 2001
From: Jeff Davis <[email protected]>
Date: Tue, 2 Jan 2024 13:42:48 -0800
Subject: [PATCH v12] CREATE SUSBCRIPTION ... SERVER.
Allow specifying a foreign server for CREATE SUBSCRIPTION, rather than
a raw connection string with CONNECTION.
Using a foreign server as a layer of indirection improves management
of multiple subscriptions to the same server. It also provides
integration with user mappings in case different subscriptions have
different owners or a subscription changes owners.
Discussion: https://postgr.es/m/[email protected]
Reviewed-by: Ashutosh Bapat
---
contrib/postgres_fdw/Makefile | 2 +
contrib/postgres_fdw/connection.c | 73 ++++++++
.../postgres_fdw/expected/postgres_fdw.out | 8 +
contrib/postgres_fdw/meson.build | 5 +
.../postgres_fdw/postgres_fdw--1.1--1.2.sql | 8 +
contrib/postgres_fdw/sql/postgres_fdw.sql | 7 +
contrib/postgres_fdw/t/010_subscription.pl | 71 ++++++++
doc/src/sgml/ref/alter_subscription.sgml | 18 +-
doc/src/sgml/ref/create_subscription.sgml | 11 +-
src/backend/catalog/pg_subscription.c | 38 +++-
src/backend/commands/foreigncmds.c | 58 +++++-
src/backend/commands/subscriptioncmds.c | 168 ++++++++++++++++--
src/backend/foreign/foreign.c | 66 +++++++
src/backend/parser/gram.y | 22 +++
src/backend/replication/logical/worker.c | 16 +-
src/bin/pg_dump/pg_dump.c | 36 +++-
src/bin/pg_dump/pg_dump.h | 1 +
src/bin/psql/tab-complete.in.c | 2 +-
src/include/catalog/pg_foreign_data_wrapper.h | 3 +
src/include/catalog/pg_subscription.h | 7 +-
src/include/foreign/foreign.h | 3 +
src/include/nodes/parsenodes.h | 3 +
src/test/regress/expected/oidjoins.out | 1 +
23 files changed, 591 insertions(+), 36 deletions(-)
create mode 100644 contrib/postgres_fdw/t/010_subscription.pl
diff --git a/contrib/postgres_fdw/Makefile b/contrib/postgres_fdw/Makefile
index 88fdce40d6..a101418d6e 100644
--- a/contrib/postgres_fdw/Makefile
+++ b/contrib/postgres_fdw/Makefile
@@ -18,6 +18,8 @@ DATA = postgres_fdw--1.0.sql postgres_fdw--1.0--1.1.sql postgres_fdw--1.1--1.2.s
REGRESS = postgres_fdw query_cancel
+TAP_TESTS = 1
+
ifdef USE_PGXS
PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 2326f391d3..48c77a8de3 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -123,6 +123,7 @@ PG_FUNCTION_INFO_V1(postgres_fdw_get_connections);
PG_FUNCTION_INFO_V1(postgres_fdw_get_connections_1_2);
PG_FUNCTION_INFO_V1(postgres_fdw_disconnect);
PG_FUNCTION_INFO_V1(postgres_fdw_disconnect_all);
+PG_FUNCTION_INFO_V1(postgres_fdw_connection);
/* prototypes of private functions */
static void make_new_connection(ConnCacheEntry *entry, UserMapping *user);
@@ -2161,6 +2162,78 @@ postgres_fdw_get_connections_internal(FunctionCallInfo fcinfo,
}
}
+/*
+ * Values in connection strings must be enclosed in single quotes. Single
+ * quotes and backslashes must be escaped with backslash. NB: these rules are
+ * different from the rules for escaping a SQL literal.
+ */
+static void
+appendEscapedValue(StringInfo str, const char *val)
+{
+ appendStringInfoChar(str, '\'');
+ for (int i = 0; val[i] != '\0'; i++)
+ {
+ if (val[i] == '\\' || val[i] == '\'')
+ appendStringInfoChar(str, '\\');
+ appendStringInfoChar(str, val[i]);
+ }
+ appendStringInfoChar(str, '\'');
+}
+
+Datum
+postgres_fdw_connection(PG_FUNCTION_ARGS)
+{
+ Oid userid = PG_GETARG_OID(0);
+ Oid serverid = PG_GETARG_OID(1);
+ ForeignServer *server = GetForeignServer(serverid);
+ UserMapping *user = GetUserMapping(userid, serverid);
+ StringInfoData str;
+ const char **keywords;
+ const char **values;
+ int n;
+
+ /*
+ * Construct connection params from generic options of ForeignServer and
+ * UserMapping. (Some of them might not be libpq options, in which case
+ * we'll just waste a few array slots.) Add 4 extra slots for
+ * application_name, fallback_application_name, client_encoding, end
+ * marker.
+ */
+ n = list_length(server->options) + list_length(user->options) + 4;
+ keywords = (const char **) palloc(n * sizeof(char *));
+ values = (const char **) palloc(n * sizeof(char *));
+
+ n = 0;
+ n += ExtractConnectionOptions(server->options,
+ keywords + n, values + n);
+ n += ExtractConnectionOptions(user->options,
+ keywords + n, values + n);
+
+ /* Set client_encoding so that libpq can convert encoding properly. */
+ keywords[n] = "client_encoding";
+ values[n] = GetDatabaseEncodingName();
+ n++;
+
+ keywords[n] = values[n] = NULL;
+
+ /* verify the set of connection parameters */
+ check_conn_params(keywords, values, user);
+
+ initStringInfo(&str);
+ for (int i = 0; i < n; i++)
+ {
+ char *sep = "";
+
+ appendStringInfo(&str, "%s%s = ", sep, keywords[i]);
+ appendEscapedValue(&str, values[i]);
+ sep = " ";
+ }
+
+ pfree(keywords);
+ pfree(values);
+ PG_RETURN_TEXT_P(cstring_to_text(str.data));
+}
+
/*
* List active foreign server connections.
*
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index f2bcd6aa98..dd560892da 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -256,6 +256,14 @@ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work again
ANALYZE ft1;
ALTER FOREIGN TABLE ft2 OPTIONS (use_remote_estimate 'true');
-- ===================================================================
+-- test subscription
+-- ===================================================================
+CREATE SUBSCRIPTION regress_pgfdw_subscription SERVER testserver1
+ PUBLICATION pub1 WITH (slot_name = NONE, connect = false);
+WARNING: subscription was created, but is not connected
+HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
+DROP SUBSCRIPTION regress_pgfdw_subscription;
+-- ===================================================================
-- test error case for create publication on foreign table
-- ===================================================================
CREATE PUBLICATION testpub_ftbl FOR TABLE ft1; -- should fail
diff --git a/contrib/postgres_fdw/meson.build b/contrib/postgres_fdw/meson.build
index 3014086ba6..e19d8e4e31 100644
--- a/contrib/postgres_fdw/meson.build
+++ b/contrib/postgres_fdw/meson.build
@@ -41,4 +41,9 @@ tests += {
],
'regress_args': ['--dlpath', meson.build_root() / 'src/test/regress'],
},
+ 'tap': {
+ 'tests': [
+ 't/010_subscription.pl',
+ ],
+ },
}
diff --git a/contrib/postgres_fdw/postgres_fdw--1.1--1.2.sql b/contrib/postgres_fdw/postgres_fdw--1.1--1.2.sql
index 81aad4fcda..8981787d16 100644
--- a/contrib/postgres_fdw/postgres_fdw--1.1--1.2.sql
+++ b/contrib/postgres_fdw/postgres_fdw--1.1--1.2.sql
@@ -16,3 +16,11 @@ CREATE FUNCTION postgres_fdw_get_connections (
RETURNS SETOF record
AS 'MODULE_PATHNAME', 'postgres_fdw_get_connections_1_2'
LANGUAGE C STRICT PARALLEL RESTRICTED;
+
+-- takes internal parameter to prevent calling from SQL
+CREATE FUNCTION postgres_fdw_connection(oid, oid, internal)
+RETURNS text
+AS 'MODULE_PATHNAME'
+LANGUAGE C STRICT;
+
+ALTER FOREIGN DATA WRAPPER postgres_fdw CONNECTION postgres_fdw_connection;
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 372fe6dad1..0ab28ea3d5 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -248,6 +248,13 @@ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work again
ANALYZE ft1;
ALTER FOREIGN TABLE ft2 OPTIONS (use_remote_estimate 'true');
+-- ===================================================================
+-- test subscription
+-- ===================================================================
+CREATE SUBSCRIPTION regress_pgfdw_subscription SERVER testserver1
+ PUBLICATION pub1 WITH (slot_name = NONE, connect = false);
+DROP SUBSCRIPTION regress_pgfdw_subscription;
+
-- ===================================================================
-- test error case for create publication on foreign table
-- ===================================================================
diff --git a/contrib/postgres_fdw/t/010_subscription.pl b/contrib/postgres_fdw/t/010_subscription.pl
new file mode 100644
index 0000000000..a39e8fdbba
--- /dev/null
+++ b/contrib/postgres_fdw/t/010_subscription.pl
@@ -0,0 +1,71 @@
+
+# Copyright (c) 2021-2024, PostgreSQL Global Development Group
+
+# Basic logical replication test
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->start;
+
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab_ins AS SELECT a, a + 1 as b FROM generate_series(1,1002) AS a");
+
+# Replicate the changes without columns
+$node_publisher->safe_psql('postgres', "CREATE TABLE tab_no_col()");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab_no_col default VALUES");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres', "CREATE EXTENSION postgres_fdw");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_ins (a int, b int)");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE tab_ins");
+
+my $publisher_host = $node_publisher->host;
+my $publisher_port = $node_publisher->port;
+$node_subscriber->safe_psql('postgres',
+ "CREATE SERVER tap_server FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host '$publisher_host', port '$publisher_port', dbname 'postgres')"
+);
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE USER MAPPING FOR PUBLIC SERVER tap_server"
+);
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE FOREIGN TABLE f_tab_ins (a int, b int) SERVER tap_server OPTIONS(table_name 'tab_ins')"
+);
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION tap_sub SERVER tap_server PUBLICATION tap_pub WITH (password_required=false)"
+);
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
+
+my $result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM (SELECT f.b = l.b as match FROM tab_ins l, f_tab_ins f WHERE l.a = f.a) WHERE match");
+is($result, qq(1002), 'check initial data was copied to subscriber');
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab_ins SELECT a, a + 1 FROM generate_series(1003,1050) a");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM (SELECT f.b = l.b as match FROM tab_ins l, f_tab_ins f WHERE l.a = f.a) WHERE match");
+is($result, qq(1050), 'check initial data was copied to subscriber');
+
+done_testing();
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index fdc648d007..35a8101796 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -21,6 +21,7 @@ PostgreSQL documentation
<refsynopsisdiv>
<synopsis>
+ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SERVER <replaceable>servername</replaceable>
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> CONNECTION '<replaceable>conninfo</replaceable>'
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SET PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> ADD PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
@@ -101,13 +102,24 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
</listitem>
</varlistentry>
+ <varlistentry id="sql-altersubscription-params-server">
+ <term><literal>SERVER <replaceable class="parameter">servername</replaceable></literal></term>
+ <listitem>
+ <para>
+ This clause replaces the foreign server or connection string originally
+ set by <xref linkend="sql-createsubscription"/> with the foreign server
+ <replaceable>servername</replaceable>.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry id="sql-altersubscription-params-connection">
<term><literal>CONNECTION '<replaceable class="parameter">conninfo</replaceable>'</literal></term>
<listitem>
<para>
- This clause replaces the connection string originally set by
- <xref linkend="sql-createsubscription"/>. See there for more
- information.
+ This clause replaces the foreign server or connection string originally
+ set by <xref linkend="sql-createsubscription"/> with the connection
+ string <replaceable>conninfo</replaceable>.
</para>
</listitem>
</varlistentry>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 6cf7d4f9a1..f787a27bc9 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -22,7 +22,7 @@ PostgreSQL documentation
<refsynopsisdiv>
<synopsis>
CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceable>
- CONNECTION '<replaceable class="parameter">conninfo</replaceable>'
+ { SERVER <replaceable class="parameter">servername</replaceable> | CONNECTION '<replaceable class="parameter">conninfo</replaceable>' }
PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...]
[ WITH ( <replaceable class="parameter">subscription_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
</synopsis>
@@ -77,6 +77,15 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
</listitem>
</varlistentry>
+ <varlistentry id="sql-createsubscription-params-server">
+ <term><literal>SERVER <replaceable class="parameter">servername</replaceable></literal></term>
+ <listitem>
+ <para>
+ A foreign server to use for the connection.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry id="sql-createsubscription-params-connection">
<term><literal>CONNECTION '<replaceable class="parameter">conninfo</replaceable>'</literal></term>
<listitem>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 89bf5ec933..66cae8ece0 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -19,11 +19,14 @@
#include "access/htup_details.h"
#include "access/tableam.h"
#include "catalog/indexing.h"
+#include "catalog/pg_foreign_server.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h"
+#include "foreign/foreign.h"
#include "miscadmin.h"
#include "storage/lmgr.h"
+#include "utils/acl.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
@@ -69,7 +72,7 @@ GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
* Fetch the subscription from the syscache.
*/
Subscription *
-GetSubscription(Oid subid, bool missing_ok)
+GetSubscription(Oid subid, bool missing_ok, bool aclcheck)
{
HeapTuple tup;
Subscription *sub;
@@ -105,10 +108,35 @@ GetSubscription(Oid subid, bool missing_ok)
sub->failover = subform->subfailover;
/* Get conninfo */
- datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
- tup,
- Anum_pg_subscription_subconninfo);
- sub->conninfo = TextDatumGetCString(datum);
+ if (OidIsValid(subform->subserver))
+ {
+ AclResult aclresult;
+
+ /* recheck ACL if requested */
+ if (aclcheck)
+ {
+ aclresult = object_aclcheck(ForeignServerRelationId,
+ subform->subserver,
+ subform->subowner, ACL_USAGE);
+
+ if (aclresult != ACLCHECK_OK)
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
+ GetUserNameFromId(subform->subowner, false),
+ ForeignServerName(subform->subserver))));
+ }
+
+ sub->conninfo = ForeignServerConnectionString(subform->subowner,
+ subform->subserver);
+ }
+ else
+ {
+ datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
+ tup,
+ Anum_pg_subscription_subconninfo);
+ sub->conninfo = TextDatumGetCString(datum);
+ }
/* Get slotname */
datum = SysCacheGetAttr(SUBSCRIPTIONOID,
diff --git a/src/backend/commands/foreigncmds.c b/src/backend/commands/foreigncmds.c
index cf61bbac1f..c97450bfae 100644
--- a/src/backend/commands/foreigncmds.c
+++ b/src/backend/commands/foreigncmds.c
@@ -511,21 +511,53 @@ lookup_fdw_validator_func(DefElem *validator)
/* validator's return value is ignored, so we don't check the type */
}
+/*
+ * Convert a connection string function name passed from the parser to an Oid.
+ */
+static Oid
+lookup_fdw_connection_func(DefElem *connection)
+{
+ Oid connectionOid;
+ Oid funcargtypes[3];
+
+ if (connection == NULL || connection->arg == NULL)
+ return InvalidOid;
+
+ /* connection string functions take user oid, server oid */
+ funcargtypes[0] = OIDOID;
+ funcargtypes[1] = OIDOID;
+ funcargtypes[2] = INTERNALOID;
+
+ connectionOid = LookupFuncName((List *) connection->arg, 3, funcargtypes, false);
+
+ /* check that connection string function has correct return type */
+ if (get_func_rettype(connectionOid) != TEXTOID)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("function %s must return type %s",
+ NameListToString((List *) connection->arg), "text")));
+
+ return connectionOid;
+}
+
/*
* Process function options of CREATE/ALTER FDW
*/
static void
parse_func_options(ParseState *pstate, List *func_options,
bool *handler_given, Oid *fdwhandler,
- bool *validator_given, Oid *fdwvalidator)
+ bool *validator_given, Oid *fdwvalidator,
+ bool *connection_given, Oid *fdwconnection)
{
ListCell *cell;
*handler_given = false;
*validator_given = false;
+ *connection_given = false;
/* return InvalidOid if not given */
*fdwhandler = InvalidOid;
*fdwvalidator = InvalidOid;
+ *fdwconnection = InvalidOid;
foreach(cell, func_options)
{
@@ -545,6 +577,13 @@ parse_func_options(ParseState *pstate, List *func_options,
*validator_given = true;
*fdwvalidator = lookup_fdw_validator_func(def);
}
+ else if (strcmp(def->defname, "connection") == 0)
+ {
+ if (*connection_given)
+ errorConflictingDefElem(def, pstate);
+ *connection_given = true;
+ *fdwconnection = lookup_fdw_connection_func(def);
+ }
else
elog(ERROR, "option \"%s\" not recognized",
def->defname);
@@ -564,8 +603,10 @@ CreateForeignDataWrapper(ParseState *pstate, CreateFdwStmt *stmt)
Oid fdwId;
bool handler_given;
bool validator_given;
+ bool connection_given;
Oid fdwhandler;
Oid fdwvalidator;
+ Oid fdwconnection;
Datum fdwoptions;
Oid ownerId;
ObjectAddress myself;
@@ -609,10 +650,12 @@ CreateForeignDataWrapper(ParseState *pstate, CreateFdwStmt *stmt)
/* Lookup handler and validator functions, if given */
parse_func_options(pstate, stmt->func_options,
&handler_given, &fdwhandler,
- &validator_given, &fdwvalidator);
+ &validator_given, &fdwvalidator,
+ &connection_given, &fdwconnection);
values[Anum_pg_foreign_data_wrapper_fdwhandler - 1] = ObjectIdGetDatum(fdwhandler);
values[Anum_pg_foreign_data_wrapper_fdwvalidator - 1] = ObjectIdGetDatum(fdwvalidator);
+ values[Anum_pg_foreign_data_wrapper_fdwconnection - 1] = ObjectIdGetDatum(fdwconnection);
nulls[Anum_pg_foreign_data_wrapper_fdwacl - 1] = true;
@@ -684,8 +727,10 @@ AlterForeignDataWrapper(ParseState *pstate, AlterFdwStmt *stmt)
Datum datum;
bool handler_given;
bool validator_given;
+ bool connection_given;
Oid fdwhandler;
Oid fdwvalidator;
+ Oid fdwconnection;
ObjectAddress myself;
rel = table_open(ForeignDataWrapperRelationId, RowExclusiveLock);
@@ -715,7 +760,8 @@ AlterForeignDataWrapper(ParseState *pstate, AlterFdwStmt *stmt)
parse_func_options(pstate, stmt->func_options,
&handler_given, &fdwhandler,
- &validator_given, &fdwvalidator);
+ &validator_given, &fdwvalidator,
+ &connection_given, &fdwconnection);
if (handler_given)
{
@@ -753,6 +799,12 @@ AlterForeignDataWrapper(ParseState *pstate, AlterFdwStmt *stmt)
fdwvalidator = fdwForm->fdwvalidator;
}
+ if (connection_given)
+ {
+ repl_val[Anum_pg_foreign_data_wrapper_fdwconnection - 1] = ObjectIdGetDatum(fdwconnection);
+ repl_repl[Anum_pg_foreign_data_wrapper_fdwconnection - 1] = true;
+ }
+
/*
* If options specified, validate and update.
*/
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 03e97730e7..ed5404cb0e 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -26,14 +26,17 @@
#include "catalog/objectaddress.h"
#include "catalog/pg_authid_d.h"
#include "catalog/pg_database_d.h"
+#include "catalog/pg_foreign_server.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h"
+#include "catalog/pg_user_mapping.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "commands/event_trigger.h"
#include "commands/subscriptioncmds.h"
#include "executor/executor.h"
+#include "foreign/foreign.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "pgstat.h"
@@ -546,6 +549,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
Datum values[Natts_pg_subscription];
Oid owner = GetUserId();
HeapTuple tup;
+ Oid serverid;
char *conninfo;
char originname[NAMEDATALEN];
List *publications;
@@ -638,15 +642,40 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
if (opts.synchronous_commit == NULL)
opts.synchronous_commit = "off";
- conninfo = stmt->conninfo;
- publications = stmt->publication;
-
/* Load the library providing us libpq calls. */
load_file("libpqwalreceiver", false);
+ if (stmt->servername)
+ {
+ ForeignServer *server;
+
+ Assert(!stmt->conninfo);
+ conninfo = NULL;
+
+ server = GetForeignServerByName(stmt->servername, false);
+ aclresult = object_aclcheck(ForeignServerRelationId, server->serverid, owner, ACL_USAGE);
+ if (aclresult != ACLCHECK_OK)
+ aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, server->servername);
+
+ /* make sure a user mapping exists */
+ GetUserMapping(owner, server->serverid);
+
+ serverid = server->serverid;
+ conninfo = ForeignServerConnectionString(owner, serverid);
+ }
+ else
+ {
+ Assert(stmt->conninfo);
+
+ serverid = InvalidOid;
+ conninfo = stmt->conninfo;
+ }
+
/* Check the connection info string. */
walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
+ publications = stmt->publication;
+
/* Everything ok, form a new tuple. */
memset(values, 0, sizeof(values));
memset(nulls, false, sizeof(nulls));
@@ -670,8 +699,12 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover);
- values[Anum_pg_subscription_subconninfo - 1] =
- CStringGetTextDatum(conninfo);
+ values[Anum_pg_subscription_subserver - 1] = serverid;
+ if (!OidIsValid(serverid))
+ values[Anum_pg_subscription_subconninfo - 1] =
+ CStringGetTextDatum(conninfo);
+ else
+ nulls[Anum_pg_subscription_subconninfo - 1] = true;
if (opts.slot_name)
values[Anum_pg_subscription_subslotname - 1] =
DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
@@ -692,6 +725,18 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
+ ObjectAddressSet(myself, SubscriptionRelationId, subid);
+
+ if (stmt->servername)
+ {
+ ObjectAddress referenced;
+
+ Assert(OidIsValid(serverid));
+
+ ObjectAddressSet(referenced, ForeignServerRelationId, serverid);
+ recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+ }
+
ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
replorigin_create(originname);
@@ -809,8 +854,6 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
if (opts.enabled)
ApplyLauncherWakeupAtCommit();
- ObjectAddressSet(myself, SubscriptionRelationId, subid);
-
InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
return myself;
@@ -1135,7 +1178,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
stmt->subname);
- sub = GetSubscription(subid, false);
+ /*
+ * Skip ACL checks on the subscription's foreign server, if any. If
+ * changing the server (or replacing it with a raw connection), then the
+ * old one will be removed anyway. If changing something unrelated,
+ * there's no need to do an additional ACL check here; that will be done
+ * by the subscription worker anyway.
+ */
+ sub = GetSubscription(subid, false, false);
/*
* Don't allow non-superuser modification of a subscription with
@@ -1155,6 +1205,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
memset(nulls, false, sizeof(nulls));
memset(replaces, false, sizeof(replaces));
+ ObjectAddressSet(myself, SubscriptionRelationId, subid);
+
switch (stmt->kind)
{
case ALTER_SUBSCRIPTION_OPTIONS:
@@ -1358,7 +1410,79 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
break;
}
+ case ALTER_SUBSCRIPTION_SERVER:
+ {
+ ForeignServer *new_server;
+ ObjectAddress referenced;
+ AclResult aclresult;
+ char *conninfo;
+
+ /*
+ * Remove what was there before, either another foreign server
+ * or a connection string.
+ */
+ if (form->subserver)
+ {
+ deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
+ DEPENDENCY_NORMAL,
+ ForeignServerRelationId, form->subserver);
+ }
+ else
+ {
+ nulls[Anum_pg_subscription_subconninfo - 1] = true;
+ replaces[Anum_pg_subscription_subconninfo - 1] = true;
+ }
+
+ /*
+ * Find the new server and user mapping. Check ACL of server
+ * based on current user ID, but find the user mapping based
+ * on the subscription owner.
+ */
+ new_server = GetForeignServerByName(stmt->servername, false);
+ aclresult = object_aclcheck(ForeignServerRelationId,
+ new_server->serverid,
+ form->subowner, ACL_USAGE);
+ if (aclresult != ACLCHECK_OK)
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
+ GetUserNameFromId(form->subowner, false),
+ ForeignServerName(new_server->serverid))));
+
+ /* make sure a user mapping exists */
+ GetUserMapping(form->subowner, new_server->serverid);
+
+ conninfo = ForeignServerConnectionString(form->subowner,
+ new_server->serverid);
+
+ /* Load the library providing us libpq calls. */
+ load_file("libpqwalreceiver", false);
+ /* Check the connection info string. */
+ walrcv_check_conninfo(conninfo,
+ sub->passwordrequired && !sub->ownersuperuser);
+
+ values[Anum_pg_subscription_subserver - 1] = new_server->serverid;
+ replaces[Anum_pg_subscription_subserver - 1] = true;
+
+ ObjectAddressSet(referenced, ForeignServerRelationId, new_server->serverid);
+ recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+
+ update_tuple = true;
+ }
+ break;
+
case ALTER_SUBSCRIPTION_CONNECTION:
+ /* remove reference to foreign server and dependencies, if present */
+ if (form->subserver)
+ {
+ deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
+ DEPENDENCY_NORMAL,
+ ForeignServerRelationId, form->subserver);
+
+ values[Anum_pg_subscription_subserver - 1] = InvalidOid;
+ replaces[Anum_pg_subscription_subserver - 1] = true;
+ }
+
/* Load the library providing us libpq calls. */
load_file("libpqwalreceiver", false);
/* Check the connection info string. */
@@ -1609,8 +1733,6 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
table_close(rel, RowExclusiveLock);
- ObjectAddressSet(myself, SubscriptionRelationId, subid);
-
InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
/* Wake up related replication workers to handle this change quickly. */
@@ -1695,9 +1817,28 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
subname = pstrdup(NameStr(*DatumGetName(datum)));
/* Get conninfo */
- datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
- Anum_pg_subscription_subconninfo);
- conninfo = TextDatumGetCString(datum);
+ if (OidIsValid(form->subserver))
+ {
+ AclResult aclresult;
+
+ aclresult = object_aclcheck(ForeignServerRelationId, form->subserver,
+ form->subowner, ACL_USAGE);
+ if (aclresult != ACLCHECK_OK)
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
+ GetUserNameFromId(form->subowner, false),
+ ForeignServerName(form->subserver))));
+
+ conninfo = ForeignServerConnectionString(form->subowner,
+ form->subserver);
+ }
+ else
+ {
+ datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
+ Anum_pg_subscription_subconninfo);
+ conninfo = TextDatumGetCString(datum);
+ }
/* Get slotname */
datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
@@ -1796,6 +1937,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
}
/* Clean up dependencies */
+ deleteDependencyRecordsFor(SubscriptionRelationId, subid, false);
deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
/* Remove any associated relation synchronization states. */
diff --git a/src/backend/foreign/foreign.c b/src/backend/foreign/foreign.c
index 4c06e1ff1c..958e41f87a 100644
--- a/src/backend/foreign/foreign.c
+++ b/src/backend/foreign/foreign.c
@@ -71,6 +71,7 @@ GetForeignDataWrapperExtended(Oid fdwid, bits16 flags)
fdw->fdwname = pstrdup(NameStr(fdwform->fdwname));
fdw->fdwhandler = fdwform->fdwhandler;
fdw->fdwvalidator = fdwform->fdwvalidator;
+ fdw->fdwconnection = fdwform->fdwconnection;
/* Extract the fdwoptions */
datum = SysCacheGetAttr(FOREIGNDATAWRAPPEROID,
@@ -175,6 +176,31 @@ GetForeignServerExtended(Oid serverid, bits16 flags)
}
+/*
+ * ForeignServerName - get name of foreign server.
+ */
+char *
+ForeignServerName(Oid serverid)
+{
+ Form_pg_foreign_server serverform;
+ char *servername;
+ HeapTuple tp;
+
+ tp = SearchSysCache1(FOREIGNSERVEROID, ObjectIdGetDatum(serverid));
+
+ if (!HeapTupleIsValid(tp))
+ elog(ERROR, "cache lookup failed for foreign server %u", serverid);
+
+ serverform = (Form_pg_foreign_server) GETSTRUCT(tp);
+
+ servername = pstrdup(NameStr(serverform->srvname));
+
+ ReleaseSysCache(tp);
+
+ return servername;
+}
+
+
/*
* GetForeignServerByName - look up the foreign server definition by name.
*/
@@ -190,6 +216,46 @@ GetForeignServerByName(const char *srvname, bool missing_ok)
}
+/*
+ * Retrieve connection string from server's FDW.
+ */
+char *
+ForeignServerConnectionString(Oid userid, Oid serverid)
+{
+ static MemoryContext tempContext = NULL;
+ MemoryContext oldcxt;
+ ForeignServer *server;
+ ForeignDataWrapper *fdw;
+ Datum connection_datum;
+ text *connection_text;
+ char *result;
+
+ if (tempContext == NULL)
+ {
+ tempContext = AllocSetContextCreate(CurrentMemoryContext,
+ "temp context",
+ ALLOCSET_DEFAULT_SIZES);
+ }
+
+ oldcxt = MemoryContextSwitchTo(tempContext);
+
+ server = GetForeignServer(serverid);
+ fdw = GetForeignDataWrapper(server->fdwid);
+ connection_datum = OidFunctionCall2(fdw->fdwconnection,
+ ObjectIdGetDatum(userid),
+ ObjectIdGetDatum(serverid));
+ connection_text = DatumGetTextPP(connection_datum);
+
+ MemoryContextSwitchTo(oldcxt);
+
+ result = text_to_cstring(connection_text);
+
+ MemoryContextReset(tempContext);
+
+ return result;
+}
+
+
/*
* GetUserMapping - look up the user mapping.
*
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index dd458182f0..a7c759fb3f 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -5379,6 +5379,8 @@ fdw_option:
| NO HANDLER { $$ = makeDefElem("handler", NULL, @1); }
| VALIDATOR handler_name { $$ = makeDefElem("validator", (Node *) $2, @1); }
| NO VALIDATOR { $$ = makeDefElem("validator", NULL, @1); }
+ | CONNECTION handler_name { $$ = makeDefElem("connection", (Node *) $2, @1); }
+ | NO CONNECTION { $$ = makeDefElem("connection", NULL, @1); }
;
fdw_options:
@@ -10735,6 +10737,16 @@ CreateSubscriptionStmt:
n->options = $8;
$$ = (Node *) n;
}
+ | CREATE SUBSCRIPTION name SERVER name PUBLICATION name_list opt_definition
+ {
+ CreateSubscriptionStmt *n =
+ makeNode(CreateSubscriptionStmt);
+ n->subname = $3;
+ n->servername = $5;
+ n->publication = $7;
+ n->options = $8;
+ $$ = (Node *) n;
+ }
;
/*****************************************************************************
@@ -10764,6 +10776,16 @@ AlterSubscriptionStmt:
n->conninfo = $5;
$$ = (Node *) n;
}
+ | ALTER SUBSCRIPTION name SERVER name
+ {
+ AlterSubscriptionStmt *n =
+ makeNode(AlterSubscriptionStmt);
+
+ n->kind = ALTER_SUBSCRIPTION_SERVER;
+ n->subname = $3;
+ n->servername = $5;
+ $$ = (Node *) n;
+ }
| ALTER SUBSCRIPTION name REFRESH PUBLICATION opt_definition
{
AlterSubscriptionStmt *n =
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 925dff9cc4..4055ea9f13 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3971,7 +3971,7 @@ maybe_reread_subscription(void)
/* Ensure allocations in permanent context. */
oldctx = MemoryContextSwitchTo(ApplyContext);
- newsub = GetSubscription(MyLogicalRepWorker->subid, true);
+ newsub = GetSubscription(MyLogicalRepWorker->subid, true, true);
/*
* Exit if the subscription was removed. This normally should not happen
@@ -4077,7 +4077,9 @@ maybe_reread_subscription(void)
}
/*
- * Callback from subscription syscache invalidation.
+ * Callback from subscription syscache invalidation. Also needed for server or
+ * user mapping invalidation, which can change the connection information for
+ * subscriptions that connect using a server object.
*/
static void
subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
@@ -4659,7 +4661,7 @@ InitializeLogRepWorker(void)
StartTransactionCommand();
oldctx = MemoryContextSwitchTo(ApplyContext);
- MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
+ MySubscription = GetSubscription(MyLogicalRepWorker->subid, true, true);
if (!MySubscription)
{
ereport(LOG,
@@ -4696,6 +4698,14 @@ InitializeLogRepWorker(void)
CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
subscription_change_cb,
(Datum) 0);
+ /* Keep us informed about subscription changes. */
+ CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
+ subscription_change_cb,
+ (Datum) 0);
+ /* Keep us informed about subscription changes. */
+ CacheRegisterSyscacheCallback(USERMAPPINGOID,
+ subscription_change_cb,
+ (Datum) 0);
CacheRegisterSyscacheCallback(AUTHOID,
subscription_change_cb,
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index d8c6330732..0efb4a554d 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4842,6 +4842,7 @@ getSubscriptions(Archive *fout)
int i_subdisableonerr;
int i_subpasswordrequired;
int i_subrunasowner;
+ int i_subservername;
int i_subconninfo;
int i_subslotname;
int i_subsynccommit;
@@ -4922,16 +4923,29 @@ getSubscriptions(Archive *fout)
if (fout->remoteVersion >= 170000)
appendPQExpBufferStr(query,
- " s.subfailover\n");
+ " s.subfailover,\n");
else
appendPQExpBuffer(query,
- " false AS subfailover\n");
+ " false AS subfailover,\n");
+
+ if (dopt->binary_upgrade && fout->remoteVersion >= 180000)
+ appendPQExpBufferStr(query, " fs.srvname AS subservername,\n"
+ " o.remote_lsn AS suboriginremotelsn,\n"
+ " s.subenabled,\n"
+ " s.subfailover\n");
+ else
+ appendPQExpBufferStr(query, " NULL AS subservername,\n"
+ " NULL AS suboriginremotelsn,\n"
+ " false AS subenabled,\n"
+ " false AS subfailover\n");
appendPQExpBufferStr(query,
"FROM pg_subscription s\n");
if (dopt->binary_upgrade && fout->remoteVersion >= 170000)
appendPQExpBufferStr(query,
+ "LEFT JOIN pg_catalog.pg_foreign_server fs \n"
+ " ON fs.oid = s.subserver \n"
"LEFT JOIN pg_catalog.pg_replication_origin_status o \n"
" ON o.external_id = 'pg_' || s.oid::text \n");
@@ -4957,6 +4971,7 @@ getSubscriptions(Archive *fout)
i_subdisableonerr = PQfnumber(res, "subdisableonerr");
i_subpasswordrequired = PQfnumber(res, "subpasswordrequired");
i_subrunasowner = PQfnumber(res, "subrunasowner");
+ i_subservername = PQfnumber(res, "subservername");
i_subconninfo = PQfnumber(res, "subconninfo");
i_subslotname = PQfnumber(res, "subslotname");
i_subsynccommit = PQfnumber(res, "subsynccommit");
@@ -4977,7 +4992,10 @@ getSubscriptions(Archive *fout)
AssignDumpId(&subinfo[i].dobj);
subinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_subname));
subinfo[i].rolname = getRoleName(PQgetvalue(res, i, i_subowner));
-
+ if (PQgetisnull(res, i, i_subservername))
+ subinfo[i].subservername = NULL;
+ else
+ subinfo[i].subservername = pg_strdup(PQgetvalue(res, i, i_subservername));
subinfo[i].subbinary =
pg_strdup(PQgetvalue(res, i, i_subbinary));
subinfo[i].substream =
@@ -5205,9 +5223,17 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
appendPQExpBuffer(delq, "DROP SUBSCRIPTION %s;\n",
qsubname);
- appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s CONNECTION ",
+ appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s ",
qsubname);
- appendStringLiteralAH(query, subinfo->subconninfo, fout);
+ if (subinfo->subservername)
+ {
+ appendPQExpBuffer(query, "SERVER %s", fmtId(subinfo->subservername));
+ }
+ else
+ {
+ appendPQExpBuffer(query, "CONNECTION ");
+ appendStringLiteralAH(query, subinfo->subconninfo, fout);
+ }
/* Build list of quoted publications and append them to query. */
if (!parsePGArray(subinfo->subpublications, &pubnames, &npubnames))
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 9f907ed5ad..5f2aab297c 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -666,6 +666,7 @@ typedef struct _SubscriptionInfo
char *subdisableonerr;
char *subpasswordrequired;
char *subrunasowner;
+ char *subservername;
char *subconninfo;
char *subslotname;
char *subsynccommit;
diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c
index 1be0056af7..6e5459d200 100644
--- a/src/bin/psql/tab-complete.in.c
+++ b/src/bin/psql/tab-complete.in.c
@@ -3660,7 +3660,7 @@ match_previous_words(int pattern_id,
/* CREATE SUBSCRIPTION */
else if (Matches("CREATE", "SUBSCRIPTION", MatchAny))
- COMPLETE_WITH("CONNECTION");
+ COMPLETE_WITH("SERVER", "CONNECTION");
else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "CONNECTION", MatchAny))
COMPLETE_WITH("PUBLICATION");
else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "CONNECTION",
diff --git a/src/include/catalog/pg_foreign_data_wrapper.h b/src/include/catalog/pg_foreign_data_wrapper.h
index 0d8759d3fd..700d6eed65 100644
--- a/src/include/catalog/pg_foreign_data_wrapper.h
+++ b/src/include/catalog/pg_foreign_data_wrapper.h
@@ -36,6 +36,9 @@ CATALOG(pg_foreign_data_wrapper,2328,ForeignDataWrapperRelationId)
Oid fdwvalidator BKI_LOOKUP_OPT(pg_proc); /* option validation
* function, or 0 if
* none */
+ Oid fdwconnection BKI_LOOKUP_OPT(pg_proc); /* connection string
+ * function, or 0 if
+ * none */
#ifdef CATALOG_VARLEN /* variable-length fields start here */
aclitem fdwacl[1]; /* access permissions */
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index b25f3fea56..38d1c783a5 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -98,9 +98,11 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
* slots) in the upstream database are enabled
* to be synchronized to the standbys. */
+ Oid subserver; /* Set if connecting with server */
+
#ifdef CATALOG_VARLEN /* variable-length fields start here */
/* Connection string to the publisher */
- text subconninfo BKI_FORCE_NOT_NULL;
+ text subconninfo; /* Set if connecting with connection string */
/* Slot name on publisher */
NameData subslotname BKI_FORCE_NULL;
@@ -174,7 +176,8 @@ typedef struct Subscription
*/
#define LOGICALREP_STREAM_PARALLEL 'p'
-extern Subscription *GetSubscription(Oid subid, bool missing_ok);
+extern Subscription *GetSubscription(Oid subid, bool missing_ok,
+ bool aclcheck);
extern void FreeSubscription(Subscription *sub);
extern void DisableSubscription(Oid subid);
diff --git a/src/include/foreign/foreign.h b/src/include/foreign/foreign.h
index 82b8153100..b4025e7f1e 100644
--- a/src/include/foreign/foreign.h
+++ b/src/include/foreign/foreign.h
@@ -28,6 +28,7 @@ typedef struct ForeignDataWrapper
char *fdwname; /* Name of the FDW */
Oid fdwhandler; /* Oid of handler function, or 0 */
Oid fdwvalidator; /* Oid of validator function, or 0 */
+ Oid fdwconnection; /* Oid of connection string function, or 0 */
List *options; /* fdwoptions as DefElem list */
} ForeignDataWrapper;
@@ -65,10 +66,12 @@ typedef struct ForeignTable
extern ForeignServer *GetForeignServer(Oid serverid);
+extern char *ForeignServerName(Oid serverid);
extern ForeignServer *GetForeignServerExtended(Oid serverid,
bits16 flags);
extern ForeignServer *GetForeignServerByName(const char *srvname,
bool missing_ok);
+extern char *ForeignServerConnectionString(Oid userid, Oid serverid);
extern UserMapping *GetUserMapping(Oid userid, Oid serverid);
extern ForeignDataWrapper *GetForeignDataWrapper(Oid fdwid);
extern ForeignDataWrapper *GetForeignDataWrapperExtended(Oid fdwid,
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index b40b661ec8..30a7bde863 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -4222,6 +4222,7 @@ typedef struct CreateSubscriptionStmt
{
NodeTag type;
char *subname; /* Name of the subscription */
+ char *servername; /* Server name of publisher */
char *conninfo; /* Connection string to publisher */
List *publication; /* One or more publication to subscribe to */
List *options; /* List of DefElem nodes */
@@ -4230,6 +4231,7 @@ typedef struct CreateSubscriptionStmt
typedef enum AlterSubscriptionType
{
ALTER_SUBSCRIPTION_OPTIONS,
+ ALTER_SUBSCRIPTION_SERVER,
ALTER_SUBSCRIPTION_CONNECTION,
ALTER_SUBSCRIPTION_SET_PUBLICATION,
ALTER_SUBSCRIPTION_ADD_PUBLICATION,
@@ -4244,6 +4246,7 @@ typedef struct AlterSubscriptionStmt
NodeTag type;
AlterSubscriptionType kind; /* ALTER_SUBSCRIPTION_OPTIONS, etc */
char *subname; /* Name of the subscription */
+ char *servername; /* Server name of publisher */
char *conninfo; /* Connection string to publisher */
List *publication; /* One or more publication to subscribe to */
List *options; /* List of DefElem nodes */
diff --git a/src/test/regress/expected/oidjoins.out b/src/test/regress/expected/oidjoins.out
index 215eb899be..59c64126bd 100644
--- a/src/test/regress/expected/oidjoins.out
+++ b/src/test/regress/expected/oidjoins.out
@@ -224,6 +224,7 @@ NOTICE: checking pg_extension {extconfig} => pg_class {oid}
NOTICE: checking pg_foreign_data_wrapper {fdwowner} => pg_authid {oid}
NOTICE: checking pg_foreign_data_wrapper {fdwhandler} => pg_proc {oid}
NOTICE: checking pg_foreign_data_wrapper {fdwvalidator} => pg_proc {oid}
+NOTICE: checking pg_foreign_data_wrapper {fdwconnection} => pg_proc {oid}
NOTICE: checking pg_foreign_server {srvowner} => pg_authid {oid}
NOTICE: checking pg_foreign_server {srvfdw} => pg_foreign_data_wrapper {oid}
NOTICE: checking pg_user_mapping {umuser} => pg_authid {oid}
--
2.34.1