On Tue, Mar 29, 2022 at 11:02 AM Amit Kapila <amit.kapil...@gmail.com> wrote:
>
> On Sat, Mar 26, 2022 at 7:53 PM vignesh C <vignes...@gmail.com> wrote:
> >
> > The patch was not applying on HEAD, attached patch which is rebased on
> > top of HEAD.
> >
>
> IIUC, this patch provides an option that allows us to give an error if
> while creating/altering subcsiction, user gives non-existant
> publications. I am not sure how useful it is to add such behavior via
> an option especially when we know that it can occur in some other ways
> like after creating the subscription, users can independently drop
> publication from publisher. I think it could be useful to provide
> additional information here but it would be better if we can follow
> Euler's suggestion [1] in the thread where he suggested issuing a
> WARNING if the publications don't exist and document that the
> subscription catalog can have non-existent publications.
>
> I think we should avoid adding new options unless they are really
> required and useful.
>
> [1] - 
> https://www.postgresql.org/message-id/a2f2fba6-40dd-44cc-b40e-58196bb77f1c%40www.fastmail.com

Thanks for the suggestion, I have changed the patch as suggested.
Attached v16 patch has the changes for the same.

Regards,
Vignesh
From 4232956ff31192e903736e921b92aac06243c171 Mon Sep 17 00:00:00 2001
From: Vigneshwaran C <vignes...@gmail.com>
Date: Sat, 26 Mar 2022 19:43:27 +0530
Subject: [PATCH v16] Identify missing publications from publisher while
 create/alter subscription.

Creating/altering subscription is successful without throwing any warning when
we specify a publication which does not exist in the publisher. This patch
checks if the specified publications are present in the publisher and throws
a warning if any of the publication is missing in the publisher.

Author: Vignesh C
Reviewed-by: Amit Kapila, Bharath Rupireddy, Japin Li, Dilip Kumar, Euler Taveira, Ashutosh Sharma
Discussion: https://www.postgresql.org/message-id/flat/20220321235957.i4jtjn4wyjucex6b%40alap3.anarazel.de#b846aaaafd4ef657cfaa8c9890f044e4
---
 doc/src/sgml/ref/create_subscription.sgml |   7 ++
 src/backend/commands/subscriptioncmds.c   | 132 ++++++++++++++++++----
 src/test/subscription/t/007_ddl.pl        |  61 ++++++++++
 3 files changed, 178 insertions(+), 22 deletions(-)

diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index b701752fc9..761d34116c 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -356,6 +356,13 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
    copied data that would be incompatible with subsequent filtering.
   </para>
 
+  <para>
+   <link linkend="catalog-pg-subscription"><structname>pg_subscription</structname></link>
+   can have non-existent publications if non-existent publication was
+   specified during <command>CREATE SUBSCRIPTION</command> or if an existing
+   publication was removed.
+  </para>
+
  </refsect1>
 
  <refsect1>
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index abebffdf3b..6db88f8443 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -375,6 +375,103 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 	}
 }
 
+/*
+ * Add publication names from the list to a string.
+ */
+static void
+get_publications_str(List *publications, StringInfo dest, bool quote_literal)
+{
+	ListCell   *lc;
+	bool		first = true;
+
+	Assert(list_length(publications) > 0);
+
+	foreach(lc, publications)
+	{
+		char	   *pubname = strVal(lfirst(lc));
+
+		if (first)
+			first = false;
+		else
+			appendStringInfoString(dest, ", ");
+
+		if (quote_literal)
+			appendStringInfoString(dest, quote_literal_cstr(pubname));
+		else
+		{
+			appendStringInfoChar(dest, '"');
+			appendStringInfoString(dest, pubname);
+			appendStringInfoChar(dest, '"');
+		}
+	}
+}
+
+/*
+ * Check the specified publication(s) is(are) present in the publisher.
+ */
+static void
+check_publications(WalReceiverConn *wrconn, List *publications)
+{
+	WalRcvExecResult *res;
+	StringInfo 		cmd;
+	TupleTableSlot *slot;
+	List	   *publicationsCopy = NIL;
+	Oid			tableRow[1] = {TEXTOID};
+
+	cmd = makeStringInfo();
+	appendStringInfoString(cmd, "SELECT t.pubname FROM\n"
+								" pg_catalog.pg_publication t WHERE\n"
+								" t.pubname IN (");
+	get_publications_str(publications, cmd, true);
+	appendStringInfoChar(cmd, ')');
+
+	res = walrcv_exec(wrconn, cmd->data, 1, tableRow);
+	pfree(cmd->data);
+	pfree(cmd);
+
+	if (res->status != WALRCV_OK_TUPLES)
+		ereport(ERROR,
+				errmsg_plural("could not receive publication from the publisher: %s",
+							  "could not receive list of publications from the publisher: %s",
+							  list_length(publications),
+							  res->err));
+
+	publicationsCopy = list_copy(publications);
+
+	/* Process publication(s). */
+	slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+	{
+		char	   *pubname;
+		bool		isnull;
+
+		pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+		Assert(!isnull);
+
+		/* Delete the publication present in publisher from the list. */
+		publicationsCopy = list_delete(publicationsCopy, makeString(pubname));
+		ExecClearTuple(slot);
+	}
+
+	ExecDropSingleTupleTableSlot(slot);
+
+	walrcv_clear_result(res);
+
+	if (list_length(publicationsCopy))
+	{
+		/* Prepare the list of non-existent publication(s) for error message. */
+		StringInfo	pubnames = makeStringInfo();
+
+		get_publications_str(publicationsCopy, pubnames, false);
+		ereport(WARNING,
+				errcode(ERRCODE_UNDEFINED_OBJECT),
+				errmsg_plural("publication %s does not exist in the publisher",
+							  "publications %s do not exist in the publisher",
+							  list_length(publicationsCopy),
+							  pubnames->data));
+	}
+}
+
 /*
  * Auxiliary function to build a text array out of a list of String nodes.
  */
@@ -555,6 +652,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 
 		PG_TRY();
 		{
+			check_publications(wrconn, publications);
+
 			/*
 			 * Set sync state based on if we were asked to do data copy or
 			 * not.
@@ -650,7 +749,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 }
 
 static void
-AlterSubscription_refresh(Subscription *sub, bool copy_data)
+AlterSubscription_refresh(Subscription *sub, bool copy_data,
+						  bool validate_publication)
 {
 	char	   *err;
 	List	   *pubrel_names;
@@ -681,6 +781,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 
 	PG_TRY();
 	{
+		if (validate_publication)
+			check_publications(wrconn, sub->publications);
+
 		/* Get the list of relations from publisher. */
 		pubrel_names = fetch_table_list(wrconn, sub->publications);
 		pubrel_names = list_concat(pubrel_names,
@@ -1048,7 +1151,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					/* Make sure refresh sees the new list of publications. */
 					sub->publications = stmt->publication;
 
-					AlterSubscription_refresh(sub, opts.copy_data);
+					AlterSubscription_refresh(sub, opts.copy_data, true);
 				}
 
 				break;
@@ -1096,7 +1199,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					/* Refresh the new list of publications. */
 					sub->publications = publist;
 
-					AlterSubscription_refresh(sub, opts.copy_data);
+					AlterSubscription_refresh(sub, opts.copy_data, isadd);
 				}
 
 				break;
@@ -1138,7 +1241,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 				PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
 
-				AlterSubscription_refresh(sub, opts.copy_data);
+				AlterSubscription_refresh(sub, opts.copy_data, false);
 
 				break;
 			}
@@ -1659,28 +1762,13 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
 	StringInfoData cmd;
 	TupleTableSlot *slot;
 	Oid			tableRow[2] = {TEXTOID, TEXTOID};
-	ListCell   *lc;
-	bool		first;
 	List	   *tablelist = NIL;
 
-	Assert(list_length(publications) > 0);
-
 	initStringInfo(&cmd);
 	appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n"
-						   "  FROM pg_catalog.pg_publication_tables t\n"
-						   " WHERE t.pubname IN (");
-	first = true;
-	foreach(lc, publications)
-	{
-		char	   *pubname = strVal(lfirst(lc));
-
-		if (first)
-			first = false;
-		else
-			appendStringInfoString(&cmd, ", ");
-
-		appendStringInfoString(&cmd, quote_literal_cstr(pubname));
-	}
+								"  FROM pg_catalog.pg_publication_tables t\n"
+								" WHERE t.pubname IN (");
+	get_publications_str(publications, &cmd, true);
 	appendStringInfoChar(&cmd, ')');
 
 	res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
diff --git a/src/test/subscription/t/007_ddl.pl b/src/test/subscription/t/007_ddl.pl
index 1144b005f6..c77ac5623b 100644
--- a/src/test/subscription/t/007_ddl.pl
+++ b/src/test/subscription/t/007_ddl.pl
@@ -41,6 +41,67 @@ COMMIT;
 
 pass "subscription disable and drop in same transaction did not hang";
 
+# One of the specified publications exists.
+my ($ret, $stdout, $stderr) = $node_subscriber->psql('postgres',
+	"CREATE SUBSCRIPTION mysub1 CONNECTION '$publisher_connstr' PUBLICATION mypub, non_existent_pub"
+);
+ok( $stderr =~
+	  m/WARNING:  publication "non_existent_pub" does not exist in the publisher/,
+	"Create subscription throws warning for non-existent publication");
+
+$node_publisher->wait_for_catchup('mysub1');
+
+# Also wait for initial table sync to finish.
+my $synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION mysub1;");
+
+# Specifying multiple non-existent publications.
+($ret, $stdout, $stderr) = $node_subscriber->psql('postgres',
+	"CREATE SUBSCRIPTION mysub1 CONNECTION '$publisher_connstr' PUBLICATION non_existent_pub, non_existent_pub1"
+);
+ok( $stderr =~
+	  m/WARNING:  publications "non_existent_pub", "non_existent_pub1" do not exist in the publisher/,
+	"Create subscription throws warning for multiple non-existent publications");
+
+$node_publisher->wait_for_catchup('mysub1');
+
+# Also wait for initial table sync to finish.
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION mysub1;");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION mysub1 CONNECTION '$publisher_connstr' PUBLICATION mypub;"
+);
+
+$node_publisher->wait_for_catchup('mysub1');
+
+# Also wait for initial table sync to finish.
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Specifying non-existent publication along with add publication.
+($ret, $stdout, $stderr) = $node_subscriber->psql(
+	'postgres',
+	"ALTER SUBSCRIPTION mysub1 ADD PUBLICATION non_existent_pub"
+);
+ok( $stderr =~
+	  m/WARNING:  publication "non_existent_pub" does not exist in the publisher/,
+	"Alter subscription add publication throws warning for non-existent publication");
+
+# Specifying non-existent publication along with set publication.
+($ret, $stdout, $stderr) = $node_subscriber->psql('postgres',
+	"ALTER SUBSCRIPTION mysub1 SET PUBLICATION non_existent_pub"
+);
+ok( $stderr =~
+	  m/WARNING:  publication "non_existent_pub" does not exist in the publisher/,
+	"Alter subscription set publication throws warning for non-existent publication");
+
 $node_subscriber->stop;
 $node_publisher->stop;
 
-- 
2.32.0

Reply via email to