On Tue, Apr 13, 2021 at 8:01 PM Bharath Rupireddy <bharath.rupireddyforpostg...@gmail.com> wrote: > > On Tue, Apr 13, 2021 at 6:22 PM vignesh C <vignes...@gmail.com> wrote: > > > 2) How about > > > + Specifies whether the subscriber must verify the > > > publications that are > > > + being subscribed to are present in the publisher. By default, > > > the subscriber > > > instead of > > > + Specifies whether the subscriber must verify if the specified > > > + publications are present in the publisher. By default, the > > > subscriber > > > > > > > Slightly reworded and modified. > > + <para> > + When true, the command will try to verify if the specified > + publications that are subscribed is present in the publisher. > + The default is <literal>false</literal>. > + </para> > > "publications that are subscribed" is not right as the subscriber is > not yet subscribed, it is "trying to subscribing", and it's not that > the command "will try to verify", it actually verifies. So you can > modify as follows: > > + <para> > + When true, the command verifies if all the specified > publications that are being subscribed to are present in the publisher > and throws an error if any of the publication doesn't exist. The > default is <literal>false</literal>. > + </para> > > > > 3) I think we can make below common code into a single function with > > > flags to differentiate processing for both, something like: > > > StringInfoData *get_publist_str(List *publicaitons, bool use_quotes, > > > bool is_fetch_table_list); > > > check_publications: > > > + /* Convert the publications which does not exist into a string. > > > */ > > > + initStringInfo(&nonExistentPublications); > > > + foreach(lc, publicationsCopy) > > > + { > > > and get_appended_publications_query: > > > foreach(lc, publications) > > > > > > With the new function that only prepares comma separated list of > > > publications, you can get rid of get_appended_publications_query and > > > just append the returned list to the query. > > > fetch_table_list: get_publist_str(publications, true, true); > > > check_publications: for select query preparation > > > get_publist_str(publications, true, false); and for error string > > > preparation get_publist_str(publications, false, false); > > > > > > And also let the new function get_publist_str allocate the string and > > > just mention as a note in the function comment that the callers should > > > pfree the returned string. > > > > > > > I felt the existing code looks better, if we have a common function, > > we will have to lot of if conditions as both the functions is not same > > to same, they operate on different data types and do the preparation > > appropriately. Like fetch_table_list get nspname & relname and > > converts it to RangeVar and adds to the list other function prepares a > > text and deletes the entries present from the list. So I did not fix > > this. Thoughts? > > I was actually thinking we could move the following duplicate code > into a function: > foreach(lc, publicationsCopy) > { > char *pubname = strVal(lfirst(lc)); > > if (first) > first = false; > else > appendStringInfoString(&pubnames, ", "); > appendStringInfoString(&pubnames, "\""); > appendStringInfoString(&pubnames, pubname); > appendStringInfoString(&pubnames, "\""); > } > and > foreach(lc, publications) > { > char *pubname = strVal(lfirst(lc)); > > if (first) > first = false; > else > appendStringInfoString(cmd, ", "); > > appendStringInfoString(cmd, quote_literal_cstr(pubname)); > } > that function can be: > static void > get_publications_str(List *publications, StringInfo dest, bool quote_literal) > { > ListCell *lc; > bool first = true; > > Assert(list_length(publications) > 0); > > foreach(lc, publications) > { > char *pubname = strVal(lfirst(lc)); > > if (first) > first = false; > else > appendStringInfoString(dest, ", "); > > if (quote_literal) > appendStringInfoString(pubnames, quote_literal_cstr(pubname)); > else > { > appendStringInfoString(&dest, "\""); > appendStringInfoString(&dest, pubname); > appendStringInfoString(&dest, "\""); > } > } > } > > This way, we can get rid of get_appended_publications_query and use > the above function to return the appended list of publications. We > need to just pass quote_literal as true while preparing the publist > string for publication query and append it to the query outside the > function. While preparing publist str for error, pass quote_literal as > false. Thoughts? >
Modified. > > > 7) You can just do > > > publications = list_copy(publications); > > > instead of using another variable publicationsCopy > > > publicationsCopy = list_copy(publications); > > > > publications is an input list to this function, I did not want this > > function to change this list. I felt existing is fine. Thoughts? > > Okay. > > Typo - it's not "subcription" +# Create subcription for a publication > which does not exist. > Modified > I think we can remove extra { } by moving the comment above if clause > much like you did in AlterSubscription_refresh. And it's not "exists", > it is "exist" change in both AlterSubscription_refresh and > CreateSubscription. > + if (validate_publication) > + { > + /* Verify specified publications exists in the publisher. */ > + check_publications(wrconn, publications); > + } > + Modified. > > Move /*no streaming */ to above NULL, NULL line: > + NULL, NULL, > NULL, NULL); /* no streaming */ > Modified. > Can we have a new function for below duplicate code? Something like: > void connect_and_check_pubs(Subscription *sub, List *publications);? > + if (validate_publication) > + { > + /* Load the library providing us libpq calls. */ > + load_file("libpqwalreceiver", false); > + > + /* Try to connect to the publisher. */ > + wrconn = walrcv_connect(sub->conninfo, true, > sub->name, &err); > + if (!wrconn) > + ereport(ERROR, > + (errmsg("could not connect to the > publisher: %s", err))); > + > + /* Verify specified publications exists in the > publisher. */ > + check_publications(wrconn, stmt->publication); > + > + /* We are done with the remote side, close connection. */ > + walrcv_disconnect(wrconn); > + } Modified. Thanks for the comments, Attached patch has the fixes for the same. Thoughts? Regards, Vignesh
From 12acbcdce9dbc3a867c4ab0305d430ef55c4fa39 Mon Sep 17 00:00:00 2001 From: vignesh <vignes...@gmail.com> Date: Wed, 7 Apr 2021 22:05:53 +0530 Subject: [PATCH v5] Identify missing publications from publisher while create/alter subscription. Creating/altering subscription is successful when we specify a publication which does not exist in the publisher. This patch checks if the specified publications are present in the publisher and throws an error if any of the publication is missing in the publisher. --- doc/src/sgml/ref/alter_subscription.sgml | 13 + doc/src/sgml/ref/create_subscription.sgml | 13 + src/backend/commands/subscriptioncmds.c | 227 +++++++++++++++--- src/bin/psql/tab-complete.c | 7 +- .../t/021_validate_publications.pl | 82 +++++++ 5 files changed, 309 insertions(+), 33 deletions(-) create mode 100644 src/test/subscription/t/021_validate_publications.pl diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 367ac814f4..81e156437b 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -160,6 +160,19 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO < </para> </listitem> </varlistentry> + + <varlistentry> + <term><literal>validate_publication</literal> (<type>boolean</type>)</term> + <listitem> + <para> + When true, the command verifies if all the specified publications + that are being subscribed to are present in the publisher and throws + an error if any of the publication doesn't exist. The default is + <literal>false</literal>. + </para> + </listitem> + </varlistentry> + </variablelist></para> </listitem> </varlistentry> diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index e812beee37..2f1f541253 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -239,6 +239,19 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl </para> </listitem> </varlistentry> + + <varlistentry> + <term><literal>validate_publication</literal> (<type>boolean</type>)</term> + <listitem> + <para> + When true, the command verifies if all the specified publications + that are being subscribed to are present in the publisher and throws + an error if any of the publication doesn't exist. The default is + <literal>false</literal>. + </para> + </listitem> + </varlistentry> + </variablelist></para> </listitem> </varlistentry> diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 517c8edd3b..c7e7abb16c 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -51,7 +51,6 @@ static void check_duplicates_in_publist(List *publist, Datum *datums); static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname); static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); - /* * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands. * @@ -69,7 +68,9 @@ parse_subscription_options(List *options, char **synchronous_commit, bool *refresh, bool *binary_given, bool *binary, - bool *streaming_given, bool *streaming) + bool *streaming_given, bool *streaming, + bool *validate_publication_given, + bool *validate_publication) { ListCell *lc; bool connect_given = false; @@ -111,6 +112,12 @@ parse_subscription_options(List *options, *streaming = false; } + if (validate_publication) + { + *validate_publication_given = false; + *validate_publication = false; + } + /* Parse options */ foreach(lc, options) { @@ -215,6 +222,16 @@ parse_subscription_options(List *options, *streaming_given = true; *streaming = defGetBoolean(defel); } + else if (strcmp(defel->defname, "validate_publication") == 0 && validate_publication) + { + if (*validate_publication_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + + *validate_publication_given = true; + *validate_publication = defGetBoolean(defel); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -247,10 +264,18 @@ parse_subscription_options(List *options, errmsg("%s and %s are mutually exclusive options", "connect = false", "copy_data = true"))); + if (validate_publication && validate_publication_given && + *validate_publication) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("%s and %s are mutually exclusive options", + "connect = false", "validate_publication = true"))); + /* Change the defaults of other options. */ *enabled = false; *create_slot = false; *copy_data = false; + *validate_publication = false; } /* @@ -287,6 +312,133 @@ parse_subscription_options(List *options, } } +/* + * Append the list of publication to dest string. + */ +static void +get_publications_str(List *publications, StringInfo dest, bool quote_literal) +{ + ListCell *lc; + bool first = true; + + Assert(list_length(publications) > 0); + + foreach(lc, publications) + { + char *pubname = strVal(lfirst(lc)); + + if (first) + first = false; + else + appendStringInfoString(dest, ", "); + + if (quote_literal) + appendStringInfoString(dest, quote_literal_cstr(pubname)); + else + { + appendStringInfoString(dest, "\""); + appendStringInfoString(dest, pubname); + appendStringInfoString(dest, "\""); + } + } +} + +/* + * Check the specified publication(s) is(are) present in the publisher. + */ +static void +check_publications(WalReceiverConn *wrconn, List *publications) +{ + WalRcvExecResult *res; + StringInfo cmd; + TupleTableSlot *slot; + List *publicationsCopy = NIL; + Oid tableRow[1] = {TEXTOID}; + + cmd = makeStringInfo(); + appendStringInfoString(cmd, "SELECT t.pubname FROM\n" + " pg_catalog.pg_publication t WHERE\n" + " t.pubname IN ("); + get_publications_str(publications, cmd, true); + appendStringInfoChar(cmd, ')'); + + res = walrcv_exec(wrconn, cmd->data, 1, tableRow); + pfree(cmd->data); + pfree(cmd); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not receive list of publications from the publisher: %s", + res->err))); + + publicationsCopy = list_copy(publications); + + /* Process publications. */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + char *pubname; + bool isnull; + + pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + + /* Delete the publication present in publisher from the list. */ + publicationsCopy = list_delete(publicationsCopy, makeString(pubname)); + ExecClearTuple(slot); + } + + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); + + if (list_length(publicationsCopy)) + { + /* Prepare the list of non-existent publications for error message. */ + StringInfo pubnames = makeStringInfo(); + + get_publications_str(publicationsCopy, pubnames, false); + ereport(ERROR, + (errcode(ERRCODE_TOO_MANY_ARGUMENTS), + errmsg_plural("publication %s does not exist in the publisher", + "publications %s do not exist in the publisher", + list_length(publicationsCopy), + pubnames->data))); + } +} + +/* + * Connect to the publisher and check if the publications exist. + */ +static void +connect_and_check_pubs(Subscription *sub, List *publications, + bool validate_publication) +{ + char *err; + + if (validate_publication) + { + /* Load the library providing us libpq calls. */ + load_file("libpqwalreceiver", false); + + /* Try to connect to the publisher. */ + wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err); + if (!wrconn) + ereport(ERROR, + (errmsg("could not connect to the publisher: %s", err))); + + /* Verify specified publications exist in the publisher. */ + PG_TRY(); + check_publications(wrconn, publications); + PG_FINALLY(); + { + /* We are done with the remote side, close connection. */ + walrcv_disconnect(wrconn); + } + PG_END_TRY(); + } +} + /* * Auxiliary function to build a text array out of a list of String nodes. */ @@ -343,6 +495,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) bool slotname_given; bool binary; bool binary_given; + bool validate_publication; + bool validate_publication_given; char originname[NAMEDATALEN]; bool create_slot; List *publications; @@ -361,7 +515,9 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) &synchronous_commit, NULL, /* no "refresh" */ &binary_given, &binary, - &streaming_given, &streaming); + &streaming_given, &streaming, + &validate_publication_given, + &validate_publication); /* * Since creating a replication slot is not transactional, rolling back @@ -472,6 +628,10 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) PG_TRY(); { + /* Verify specified publications exist in the publisher. */ + if (validate_publication) + check_publications(wrconn, publications); + /* * Set sync state based on if we were asked to do data copy or * not. @@ -539,7 +699,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) } static void -AlterSubscription_refresh(Subscription *sub, bool copy_data) +AlterSubscription_refresh(Subscription *sub, bool copy_data, bool check_pub) { char *err; List *pubrel_names; @@ -568,6 +728,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) ereport(ERROR, (errmsg("could not connect to the publisher: %s", err))); + /* Verify specified publications exist in the publisher. */ + if (check_pub) + check_publications(wrconn, sub->publications); + /* Get the table list from publisher. */ pubrel_names = fetch_table_list(wrconn, sub->publications); @@ -814,7 +978,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) &synchronous_commit, NULL, /* no "refresh" */ &binary_given, &binary, - &streaming_given, &streaming); + &streaming_given, &streaming, + NULL, NULL); if (slotname_given) { @@ -871,7 +1036,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) NULL, /* no "synchronous_commit" */ NULL, /* no "refresh" */ NULL, NULL, /* no "binary" */ - NULL, NULL); /* no streaming */ + NULL, NULL, /* no streaming */ + NULL, NULL); Assert(enabled_given); if (!sub->slotname && enabled) @@ -906,6 +1072,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) { bool copy_data; bool refresh; + bool validate_publication; + bool validate_publication_given; parse_subscription_options(stmt->options, NULL, /* no "connect" */ @@ -916,12 +1084,15 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) NULL, /* no "synchronous_commit" */ &refresh, NULL, NULL, /* no "binary" */ - NULL, NULL); /* no "streaming" */ + NULL, NULL, /* no "streaming" */ + &validate_publication_given, + &validate_publication); values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(stmt->publication); replaces[Anum_pg_subscription_subpublications - 1] = true; update_tuple = true; + connect_and_check_pubs(sub, stmt->publication, validate_publication); /* Refresh if user asked us to. */ if (refresh) @@ -937,7 +1108,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) /* Make sure refresh sees the new list of publications. */ sub->publications = stmt->publication; - AlterSubscription_refresh(sub, copy_data); + AlterSubscription_refresh(sub, copy_data, false); } break; @@ -950,6 +1121,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) bool copy_data; bool refresh; List *publist; + bool validate_publication; + bool validate_publication_given; publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname); @@ -963,13 +1136,18 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) NULL, /* no "synchronous_commit" */ &refresh, NULL, NULL, /* no "binary" */ - NULL, NULL); /* no "streaming" */ + NULL, NULL, /* no "streaming" */ + /* for drop, no "validate_publication" */ + isadd ? &validate_publication_given : NULL, + isadd ? &validate_publication : NULL); values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(publist); replaces[Anum_pg_subscription_subpublications - 1] = true; update_tuple = true; + if (isadd) + connect_and_check_pubs(sub, stmt->publication, validate_publication); /* Refresh if user asked us to. */ if (refresh) @@ -985,7 +1163,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) /* Only refresh the added/dropped list of publications. */ sub->publications = stmt->publication; - AlterSubscription_refresh(sub, copy_data); + AlterSubscription_refresh(sub, copy_data, false); } break; @@ -994,6 +1172,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) case ALTER_SUBSCRIPTION_REFRESH: { bool copy_data; + bool validate_publication; + bool validate_publication_given; if (!sub->enabled) ereport(ERROR, @@ -1009,11 +1189,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) NULL, /* no "synchronous_commit" */ NULL, /* no "refresh" */ NULL, NULL, /* no "binary" */ - NULL, NULL); /* no "streaming" */ + NULL, NULL, /* no "streaming" */ + &validate_publication_given, + &validate_publication); PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH"); - AlterSubscription_refresh(sub, copy_data); + AlterSubscription_refresh(sub, copy_data, validate_publication); break; } @@ -1476,28 +1658,13 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) StringInfoData cmd; TupleTableSlot *slot; Oid tableRow[2] = {TEXTOID, TEXTOID}; - ListCell *lc; - bool first; List *tablelist = NIL; - Assert(list_length(publications) > 0); - initStringInfo(&cmd); appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n" - " FROM pg_catalog.pg_publication_tables t\n" - " WHERE t.pubname IN ("); - first = true; - foreach(lc, publications) - { - char *pubname = strVal(lfirst(lc)); - - if (first) - first = false; - else - appendStringInfoString(&cmd, ", "); - - appendStringInfoString(&cmd, quote_literal_cstr(pubname)); - } + " FROM pg_catalog.pg_publication_tables t\n" + " WHERE t.pubname IN ("); + get_publications_str(publications, &cmd, true); appendStringInfoChar(&cmd, ')'); res = walrcv_exec(wrconn, cmd.data, 2, tableRow); diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index 7c4933333b..5a0d0f3c8d 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1674,7 +1674,7 @@ psql_completion(const char *text, int start, int end) /* ALTER SUBSCRIPTION <name> REFRESH PUBLICATION WITH ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("REFRESH", "PUBLICATION", "WITH", "(")) - COMPLETE_WITH("copy_data"); + COMPLETE_WITH("copy_data", "validate_publication"); /* ALTER SUBSCRIPTION <name> SET */ else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, "SET")) COMPLETE_WITH("(", "PUBLICATION"); @@ -1693,7 +1693,7 @@ psql_completion(const char *text, int start, int end) /* ALTER SUBSCRIPTION <name> ADD|DROP|SET PUBLICATION <name> WITH ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("ADD|DROP|SET", "PUBLICATION", MatchAny, "WITH", "(")) - COMPLETE_WITH("copy_data", "refresh"); + COMPLETE_WITH("copy_data", "refresh", "validate_publication"); /* ALTER SCHEMA <name> */ else if (Matches("ALTER", "SCHEMA", MatchAny)) @@ -2780,7 +2780,8 @@ psql_completion(const char *text, int start, int end) /* Complete "CREATE SUBSCRIPTION <name> ... WITH ( <opt>" */ else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "(")) COMPLETE_WITH("copy_data", "connect", "create_slot", "enabled", - "slot_name", "synchronous_commit"); + "slot_name", "synchronous_commit", + "validate_publication"); /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */ diff --git a/src/test/subscription/t/021_validate_publications.pl b/src/test/subscription/t/021_validate_publications.pl new file mode 100644 index 0000000000..95676148da --- /dev/null +++ b/src/test/subscription/t/021_validate_publications.pl @@ -0,0 +1,82 @@ +# Tests for various bugs found over time +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 7; + +# Create subscription for a publication which does not exist. +my $node_publisher = get_new_node('testpublisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +my $node_subscriber = get_new_node('testsubscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION testpub1 FOR ALL TABLES"); + +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION testsub1 CONNECTION '$publisher_connstr' PUBLICATION testpub1" +); + +# Specified publication does not exist. +my ($ret, $stdout, $stderr) = $node_subscriber->psql('postgres', + "CREATE SUBSCRIPTION testsub2 CONNECTION '$publisher_connstr' PUBLICATION pub_doesnt_exist WITH (VALIDATE_PUBLICATION = TRUE)" +); +ok( $stderr =~ + /ERROR: publication "pub_doesnt_exist" does not exist in the publisher/, + "Create subscription for non existent publication fails"); + +# One of the specified publication exist. +($ret, $stdout, $stderr) = $node_subscriber->psql('postgres', + "CREATE SUBSCRIPTION testsub2 CONNECTION '$publisher_connstr' PUBLICATION testpub1, pub_doesnt_exist WITH (VALIDATE_PUBLICATION = TRUE)" +); +ok( $stderr =~ + /ERROR: publication "pub_doesnt_exist" does not exist in the publisher/, + "Create subscription for non existent publication fails"); +# Multiple publications does not exist. +($ret, $stdout, $stderr) = $node_subscriber->psql('postgres', + "CREATE SUBSCRIPTION testsub2 CONNECTION '$publisher_connstr' PUBLICATION pub_doesnt_exist, pub_doesnt_exist1 WITH (VALIDATE_PUBLICATION = TRUE)" +); +ok( $stderr =~ + /ERROR: publications "pub_doesnt_exist", "pub_doesnt_exist1" do not exist in the publisher/, + "Create subscription for non existent publication fails"); + +# Add non existent publication. +($ret, $stdout, $stderr) = ($ret, $stdout, $stderr) = $node_subscriber->psql('postgres', + "ALTER SUBSCRIPTION testsub1 ADD PUBLICATION pub_doesnt_exist WITH (REFRESH = FALSE, VALIDATE_PUBLICATION = TRUE)" +); +ok( $stderr =~ + /ERROR: publication "pub_doesnt_exist" does not exist in the publisher/, + "Alter subscription add non existent publication fails"); + +# Specified publication does not exist. +($ret, $stdout, $stderr) = $node_subscriber->psql('postgres', + "ALTER SUBSCRIPTION testsub1 SET PUBLICATION pub_doesnt_exist WITH (VALIDATE_PUBLICATION = TRUE)"); +ok( $stderr =~ + /ERROR: publication "pub_doesnt_exist" does not exist in the publisher/, + "Alter subscription for non existent publication fails"); + +# Specified publication does not exist with refresh = false. +($ret, $stdout, $stderr) = $node_subscriber->psql('postgres', + "ALTER SUBSCRIPTION testsub1 SET PUBLICATION pub_doesnt_exist WITH (REFRESH = FALSE, VALIDATE_PUBLICATION = TRUE)" +); +ok( $stderr =~ + /ERROR: publication "pub_doesnt_exist" does not exist in the publisher/, + "Alter subscription for non existent publication fails"); + +# Set publication on non existent database. +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION testsub1 CONNECTION 'dbname=regress_doesnotexist2'"); +($ret, $stdout, $stderr) = $node_subscriber->psql('postgres', + "ALTER SUBSCRIPTION testsub1 SET PUBLICATION pub_doesnt_exist WITH (REFRESH = FALSE, VALIDATE_PUBLICATION = TRUE)" +); +ok( $stderr =~ /ERROR: could not connect to the publisher/, + "Alter subscription for non existent publication fails"); + +$node_publisher->stop('fast'); +$node_subscriber->stop('fast'); -- 2.25.1