On Tue, Mar 29, 2022 at 11:02 AM Amit Kapila <amit.kapil...@gmail.com> wrote: > > On Sat, Mar 26, 2022 at 7:53 PM vignesh C <vignes...@gmail.com> wrote: > > > > The patch was not applying on HEAD, attached patch which is rebased on > > top of HEAD. > > > > IIUC, this patch provides an option that allows us to give an error if > while creating/altering subcsiction, user gives non-existant > publications. I am not sure how useful it is to add such behavior via > an option especially when we know that it can occur in some other ways > like after creating the subscription, users can independently drop > publication from publisher. I think it could be useful to provide > additional information here but it would be better if we can follow > Euler's suggestion [1] in the thread where he suggested issuing a > WARNING if the publications don't exist and document that the > subscription catalog can have non-existent publications. > > I think we should avoid adding new options unless they are really > required and useful. > > [1] - > https://www.postgresql.org/message-id/a2f2fba6-40dd-44cc-b40e-58196bb77f1c%40www.fastmail.com
Thanks for the suggestion, I have changed the patch as suggested. Attached v16 patch has the changes for the same. Regards, Vignesh
From 4232956ff31192e903736e921b92aac06243c171 Mon Sep 17 00:00:00 2001 From: Vigneshwaran C <vignes...@gmail.com> Date: Sat, 26 Mar 2022 19:43:27 +0530 Subject: [PATCH v16] Identify missing publications from publisher while create/alter subscription. Creating/altering subscription is successful without throwing any warning when we specify a publication which does not exist in the publisher. This patch checks if the specified publications are present in the publisher and throws a warning if any of the publication is missing in the publisher. Author: Vignesh C Reviewed-by: Amit Kapila, Bharath Rupireddy, Japin Li, Dilip Kumar, Euler Taveira, Ashutosh Sharma Discussion: https://www.postgresql.org/message-id/flat/20220321235957.i4jtjn4wyjucex6b%40alap3.anarazel.de#b846aaaafd4ef657cfaa8c9890f044e4 --- doc/src/sgml/ref/create_subscription.sgml | 7 ++ src/backend/commands/subscriptioncmds.c | 132 ++++++++++++++++++---- src/test/subscription/t/007_ddl.pl | 61 ++++++++++ 3 files changed, 178 insertions(+), 22 deletions(-) diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index b701752fc9..761d34116c 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -356,6 +356,13 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl copied data that would be incompatible with subsequent filtering. </para> + <para> + <link linkend="catalog-pg-subscription"><structname>pg_subscription</structname></link> + can have non-existent publications if non-existent publication was + specified during <command>CREATE SUBSCRIPTION</command> or if an existing + publication was removed. + </para> + </refsect1> <refsect1> diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index abebffdf3b..6db88f8443 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -375,6 +375,103 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, } } +/* + * Add publication names from the list to a string. + */ +static void +get_publications_str(List *publications, StringInfo dest, bool quote_literal) +{ + ListCell *lc; + bool first = true; + + Assert(list_length(publications) > 0); + + foreach(lc, publications) + { + char *pubname = strVal(lfirst(lc)); + + if (first) + first = false; + else + appendStringInfoString(dest, ", "); + + if (quote_literal) + appendStringInfoString(dest, quote_literal_cstr(pubname)); + else + { + appendStringInfoChar(dest, '"'); + appendStringInfoString(dest, pubname); + appendStringInfoChar(dest, '"'); + } + } +} + +/* + * Check the specified publication(s) is(are) present in the publisher. + */ +static void +check_publications(WalReceiverConn *wrconn, List *publications) +{ + WalRcvExecResult *res; + StringInfo cmd; + TupleTableSlot *slot; + List *publicationsCopy = NIL; + Oid tableRow[1] = {TEXTOID}; + + cmd = makeStringInfo(); + appendStringInfoString(cmd, "SELECT t.pubname FROM\n" + " pg_catalog.pg_publication t WHERE\n" + " t.pubname IN ("); + get_publications_str(publications, cmd, true); + appendStringInfoChar(cmd, ')'); + + res = walrcv_exec(wrconn, cmd->data, 1, tableRow); + pfree(cmd->data); + pfree(cmd); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + errmsg_plural("could not receive publication from the publisher: %s", + "could not receive list of publications from the publisher: %s", + list_length(publications), + res->err)); + + publicationsCopy = list_copy(publications); + + /* Process publication(s). */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + char *pubname; + bool isnull; + + pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + + /* Delete the publication present in publisher from the list. */ + publicationsCopy = list_delete(publicationsCopy, makeString(pubname)); + ExecClearTuple(slot); + } + + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); + + if (list_length(publicationsCopy)) + { + /* Prepare the list of non-existent publication(s) for error message. */ + StringInfo pubnames = makeStringInfo(); + + get_publications_str(publicationsCopy, pubnames, false); + ereport(WARNING, + errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg_plural("publication %s does not exist in the publisher", + "publications %s do not exist in the publisher", + list_length(publicationsCopy), + pubnames->data)); + } +} + /* * Auxiliary function to build a text array out of a list of String nodes. */ @@ -555,6 +652,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, PG_TRY(); { + check_publications(wrconn, publications); + /* * Set sync state based on if we were asked to do data copy or * not. @@ -650,7 +749,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, } static void -AlterSubscription_refresh(Subscription *sub, bool copy_data) +AlterSubscription_refresh(Subscription *sub, bool copy_data, + bool validate_publication) { char *err; List *pubrel_names; @@ -681,6 +781,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) PG_TRY(); { + if (validate_publication) + check_publications(wrconn, sub->publications); + /* Get the list of relations from publisher. */ pubrel_names = fetch_table_list(wrconn, sub->publications); pubrel_names = list_concat(pubrel_names, @@ -1048,7 +1151,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, /* Make sure refresh sees the new list of publications. */ sub->publications = stmt->publication; - AlterSubscription_refresh(sub, opts.copy_data); + AlterSubscription_refresh(sub, opts.copy_data, true); } break; @@ -1096,7 +1199,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, /* Refresh the new list of publications. */ sub->publications = publist; - AlterSubscription_refresh(sub, opts.copy_data); + AlterSubscription_refresh(sub, opts.copy_data, isadd); } break; @@ -1138,7 +1241,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH"); - AlterSubscription_refresh(sub, opts.copy_data); + AlterSubscription_refresh(sub, opts.copy_data, false); break; } @@ -1659,28 +1762,13 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) StringInfoData cmd; TupleTableSlot *slot; Oid tableRow[2] = {TEXTOID, TEXTOID}; - ListCell *lc; - bool first; List *tablelist = NIL; - Assert(list_length(publications) > 0); - initStringInfo(&cmd); appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n" - " FROM pg_catalog.pg_publication_tables t\n" - " WHERE t.pubname IN ("); - first = true; - foreach(lc, publications) - { - char *pubname = strVal(lfirst(lc)); - - if (first) - first = false; - else - appendStringInfoString(&cmd, ", "); - - appendStringInfoString(&cmd, quote_literal_cstr(pubname)); - } + " FROM pg_catalog.pg_publication_tables t\n" + " WHERE t.pubname IN ("); + get_publications_str(publications, &cmd, true); appendStringInfoChar(&cmd, ')'); res = walrcv_exec(wrconn, cmd.data, 2, tableRow); diff --git a/src/test/subscription/t/007_ddl.pl b/src/test/subscription/t/007_ddl.pl index 1144b005f6..c77ac5623b 100644 --- a/src/test/subscription/t/007_ddl.pl +++ b/src/test/subscription/t/007_ddl.pl @@ -41,6 +41,67 @@ COMMIT; pass "subscription disable and drop in same transaction did not hang"; +# One of the specified publications exists. +my ($ret, $stdout, $stderr) = $node_subscriber->psql('postgres', + "CREATE SUBSCRIPTION mysub1 CONNECTION '$publisher_connstr' PUBLICATION mypub, non_existent_pub" +); +ok( $stderr =~ + m/WARNING: publication "non_existent_pub" does not exist in the publisher/, + "Create subscription throws warning for non-existent publication"); + +$node_publisher->wait_for_catchup('mysub1'); + +# Also wait for initial table sync to finish. +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION mysub1;"); + +# Specifying multiple non-existent publications. +($ret, $stdout, $stderr) = $node_subscriber->psql('postgres', + "CREATE SUBSCRIPTION mysub1 CONNECTION '$publisher_connstr' PUBLICATION non_existent_pub, non_existent_pub1" +); +ok( $stderr =~ + m/WARNING: publications "non_existent_pub", "non_existent_pub1" do not exist in the publisher/, + "Create subscription throws warning for multiple non-existent publications"); + +$node_publisher->wait_for_catchup('mysub1'); + +# Also wait for initial table sync to finish. +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION mysub1;"); + +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION mysub1 CONNECTION '$publisher_connstr' PUBLICATION mypub;" +); + +$node_publisher->wait_for_catchup('mysub1'); + +# Also wait for initial table sync to finish. +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Specifying non-existent publication along with add publication. +($ret, $stdout, $stderr) = $node_subscriber->psql( + 'postgres', + "ALTER SUBSCRIPTION mysub1 ADD PUBLICATION non_existent_pub" +); +ok( $stderr =~ + m/WARNING: publication "non_existent_pub" does not exist in the publisher/, + "Alter subscription add publication throws warning for non-existent publication"); + +# Specifying non-existent publication along with set publication. +($ret, $stdout, $stderr) = $node_subscriber->psql('postgres', + "ALTER SUBSCRIPTION mysub1 SET PUBLICATION non_existent_pub" +); +ok( $stderr =~ + m/WARNING: publication "non_existent_pub" does not exist in the publisher/, + "Alter subscription set publication throws warning for non-existent publication"); + $node_subscriber->stop; $node_publisher->stop; -- 2.32.0