On Wed, Mar 30, 2022 at 5:42 PM Bharath Rupireddy
<bharath.rupireddyforpostg...@gmail.com> wrote:
>
> On Wed, Mar 30, 2022 at 5:37 PM Bharath Rupireddy
> <bharath.rupireddyforpostg...@gmail.com> wrote:
> >
> > On Wed, Mar 30, 2022 at 4:29 PM Amit Kapila <amit.kapil...@gmail.com> wrote:
> > >
> > > On Wed, Mar 30, 2022 at 12:22 PM vignesh C <vignes...@gmail.com> wrote:
> > > >
> > > > I have made the changes for this, attached v17 patch has the changes
> > > > for the same.
> > > >
> > >
> > > The patch looks good to me. I have made minor edits in the comments
> > > and docs. See the attached and let me know what you think? I intend to
> > > commit this tomorrow unless there are more comments or suggestions.
> >
> > I have one minor comment:
> >
> > + "Create subscription throws warning for multiple non-existent 
> > publications");
> > +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION mysub1;");
> > + "CREATE SUBSCRIPTION mysub1 CONNECTION '$publisher_connstr'
> > PUBLICATION mypub;"
> > + "ALTER SUBSCRIPTION mysub1 ADD PUBLICATION non_existent_pub"
> > + "ALTER SUBSCRIPTION mysub1 SET PUBLICATION non_existent_pub"
> >
> > Why should we drop the subscription mysub1 and create it for ALTER ..
> > ADD and ALTER .. SET tests? Can't we just do below which saves
> > unnecessary subscription creation, drop, wait_for_catchup and
> > poll_query_until?
> >
> > + "Create subscription throws warning for multiple non-existent 
> > publications");
> > + "ALTER SUBSCRIPTION mysub1 ADD PUBLICATION non_existent_pub2"
> > + "ALTER SUBSCRIPTION mysub1 SET PUBLICATION non_existent_pub3"
>
> Or I would even simplify the entire tests as follows:
>
> + "CREATE SUBSCRIPTION mysub1 CONNECTION '$publisher_connstr'
> PUBLICATION mypub, non_existent_pub1"
> + "Create subscription throws warning for non-existent publication");
> >> no drop of mysub1 >>
> + "CREATE SUBSCRIPTION mysub2 CONNECTION '$publisher_connstr'
> PUBLICATION non_existent_pub1, non_existent_pub2"
> + "Create subscription throws warning for multiple non-existent 
> publications");
> >> no drop of mysub2 >>
> + "ALTER SUBSCRIPTION mysub2 ADD PUBLICATION non_existent_pub3"
> + "Alter subscription add publication throws warning for non-existent
> publication");
> + "ALTER SUBSCRIPTION mysub2 SET PUBLICATION non_existent_pub4"
> + "Alter subscription set publication throws warning for non-existent
> publication");
>  $node_subscriber->stop;
>  $node_publisher->stop;

Your suggestion looks valid, I have modified it as suggested.
Additionally I have removed Create subscription with multiple
non-existent publications and changed add publication with sing
non-existent publication to add publication with multiple non-existent
publications to cover the multiple non-existent publications path.
Attached v19 patch has the changes for the same.

Regards,
Vignesh
From fc175350780ca046e977c9b60d1f79873d19ae7e Mon Sep 17 00:00:00 2001
From: Vigneshwaran C <vignes...@gmail.com>
Date: Sat, 26 Mar 2022 19:43:27 +0530
Subject: [PATCH v19] Raise WARNING for missing publications.

When we create or alter a subscription to add publications raise a warning
for non-existent publications. We don't want to give an error here because
it is possible that users can later create the missing publications.

Author: Vignesh C
Reviewed-by: Amit Kapila, Bharath Rupireddy, Japin Li, Dilip Kumar, Euler Taveira, Ashutosh Sharma
Discussion: https://www.postgresql.org/message-id/flat/20220321235957.i4jtjn4wyjucex6b%40alap3.anarazel.de#b846aaaafd4ef657cfaa8c9890f044e4
---
 doc/src/sgml/ref/alter_subscription.sgml  |   4 +-
 doc/src/sgml/ref/create_subscription.sgml |   7 ++
 src/backend/commands/subscriptioncmds.c   | 133 ++++++++++++++++++----
 src/test/subscription/t/007_ddl.pl        |  37 ++++++
 4 files changed, 160 insertions(+), 21 deletions(-)

diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 3e46bbdb04..fe13ab9a2d 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -114,7 +114,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
       replaces the entire list of publications with a new list,
       <literal>ADD</literal> adds additional publications to the list of
       publications, and <literal>DROP</literal> removes the publications from
-      the list of publications.  See <xref linkend="sql-createsubscription"/>
+      the list of publications.  We allow non-existent publications to be
+      specified in <literal>ADD</literal> and <literal>SET</literal> variants
+      so that users can add those later.  See <xref linkend="sql-createsubscription"/>
       for more information.  By default, this command will also act like
       <literal>REFRESH PUBLICATION</literal>.
      </para>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index b701752fc9..ebf7db57c5 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -356,6 +356,13 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
    copied data that would be incompatible with subsequent filtering.
   </para>
 
+  <para>
+   We allow non-existent publications to be specified so that users can add
+   those later. This means
+   <link linkend="catalog-pg-subscription"><structname>pg_subscription</structname></link>
+   can have non-existent publications.
+  </para>
+
  </refsect1>
 
  <refsect1>
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index abebffdf3b..85dacbe93d 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -375,6 +375,103 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 	}
 }
 
+/*
+ * Add publication names from the list to a string.
+ */
+static void
+get_publications_str(List *publications, StringInfo dest, bool quote_literal)
+{
+	ListCell   *lc;
+	bool		first = true;
+
+	Assert(list_length(publications) > 0);
+
+	foreach(lc, publications)
+	{
+		char	   *pubname = strVal(lfirst(lc));
+
+		if (first)
+			first = false;
+		else
+			appendStringInfoString(dest, ", ");
+
+		if (quote_literal)
+			appendStringInfoString(dest, quote_literal_cstr(pubname));
+		else
+		{
+			appendStringInfoChar(dest, '"');
+			appendStringInfoString(dest, pubname);
+			appendStringInfoChar(dest, '"');
+		}
+	}
+}
+
+/*
+ * Check the specified publication(s) is(are) present in the publisher.
+ */
+static void
+check_publications(WalReceiverConn *wrconn, List *publications)
+{
+	WalRcvExecResult *res;
+	StringInfo	cmd;
+	TupleTableSlot *slot;
+	List	   *publicationsCopy = NIL;
+	Oid			tableRow[1] = {TEXTOID};
+
+	cmd = makeStringInfo();
+	appendStringInfoString(cmd, "SELECT t.pubname FROM\n"
+						   " pg_catalog.pg_publication t WHERE\n"
+						   " t.pubname IN (");
+	get_publications_str(publications, cmd, true);
+	appendStringInfoChar(cmd, ')');
+
+	res = walrcv_exec(wrconn, cmd->data, 1, tableRow);
+	pfree(cmd->data);
+	pfree(cmd);
+
+	if (res->status != WALRCV_OK_TUPLES)
+		ereport(ERROR,
+				errmsg_plural("could not receive publication from the publisher: %s",
+							  "could not receive list of publications from the publisher: %s",
+							  list_length(publications),
+							  res->err));
+
+	publicationsCopy = list_copy(publications);
+
+	/* Process publication(s). */
+	slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+	{
+		char	   *pubname;
+		bool		isnull;
+
+		pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+		Assert(!isnull);
+
+		/* Delete the publication present in publisher from the list. */
+		publicationsCopy = list_delete(publicationsCopy, makeString(pubname));
+		ExecClearTuple(slot);
+	}
+
+	ExecDropSingleTupleTableSlot(slot);
+
+	walrcv_clear_result(res);
+
+	if (list_length(publicationsCopy))
+	{
+		/* Prepare the list of non-existent publication(s) for error message. */
+		StringInfo	pubnames = makeStringInfo();
+
+		get_publications_str(publicationsCopy, pubnames, false);
+		ereport(WARNING,
+				errcode(ERRCODE_UNDEFINED_OBJECT),
+				errmsg_plural("publication %s does not exist in the publisher",
+							  "publications %s do not exist in the publisher",
+							  list_length(publicationsCopy),
+							  pubnames->data));
+	}
+}
+
 /*
  * Auxiliary function to build a text array out of a list of String nodes.
  */
@@ -555,6 +652,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 
 		PG_TRY();
 		{
+			check_publications(wrconn, publications);
+
 			/*
 			 * Set sync state based on if we were asked to do data copy or
 			 * not.
@@ -650,7 +749,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 }
 
 static void
-AlterSubscription_refresh(Subscription *sub, bool copy_data)
+AlterSubscription_refresh(Subscription *sub, bool copy_data,
+						  List *validate_publications)
 {
 	char	   *err;
 	List	   *pubrel_names;
@@ -681,6 +781,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 
 	PG_TRY();
 	{
+		if (validate_publications)
+			check_publications(wrconn, validate_publications);
+
 		/* Get the list of relations from publisher. */
 		pubrel_names = fetch_table_list(wrconn, sub->publications);
 		pubrel_names = list_concat(pubrel_names,
@@ -1048,7 +1151,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					/* Make sure refresh sees the new list of publications. */
 					sub->publications = stmt->publication;
 
-					AlterSubscription_refresh(sub, opts.copy_data);
+					AlterSubscription_refresh(sub, opts.copy_data,
+											  stmt->publication);
 				}
 
 				break;
@@ -1074,6 +1178,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				/* Refresh if user asked us to. */
 				if (opts.refresh)
 				{
+					/* We only need to validate user specified publications. */
+					List	   *validate_publications = (isadd) ? stmt->publication : NULL;
+
 					if (!sub->enabled)
 						ereport(ERROR,
 								(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1096,7 +1203,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					/* Refresh the new list of publications. */
 					sub->publications = publist;
 
-					AlterSubscription_refresh(sub, opts.copy_data);
+					AlterSubscription_refresh(sub, opts.copy_data,
+											  validate_publications);
 				}
 
 				break;
@@ -1138,7 +1246,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 				PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
 
-				AlterSubscription_refresh(sub, opts.copy_data);
+				AlterSubscription_refresh(sub, opts.copy_data, NULL);
 
 				break;
 			}
@@ -1659,28 +1767,13 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
 	StringInfoData cmd;
 	TupleTableSlot *slot;
 	Oid			tableRow[2] = {TEXTOID, TEXTOID};
-	ListCell   *lc;
-	bool		first;
 	List	   *tablelist = NIL;
 
-	Assert(list_length(publications) > 0);
-
 	initStringInfo(&cmd);
 	appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n"
 						   "  FROM pg_catalog.pg_publication_tables t\n"
 						   " WHERE t.pubname IN (");
-	first = true;
-	foreach(lc, publications)
-	{
-		char	   *pubname = strVal(lfirst(lc));
-
-		if (first)
-			first = false;
-		else
-			appendStringInfoString(&cmd, ", ");
-
-		appendStringInfoString(&cmd, quote_literal_cstr(pubname));
-	}
+	get_publications_str(publications, &cmd, true);
 	appendStringInfoChar(&cmd, ')');
 
 	res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
diff --git a/src/test/subscription/t/007_ddl.pl b/src/test/subscription/t/007_ddl.pl
index 1144b005f6..39c32eda44 100644
--- a/src/test/subscription/t/007_ddl.pl
+++ b/src/test/subscription/t/007_ddl.pl
@@ -41,6 +41,43 @@ COMMIT;
 
 pass "subscription disable and drop in same transaction did not hang";
 
+# One of the specified publications exists.
+my ($ret, $stdout, $stderr) = $node_subscriber->psql('postgres',
+	"CREATE SUBSCRIPTION mysub1 CONNECTION '$publisher_connstr' PUBLICATION mypub, non_existent_pub"
+);
+ok( $stderr =~
+	  m/WARNING:  publication "non_existent_pub" does not exist in the publisher/,
+	"Create subscription throws warning for non-existent publication");
+
+$node_publisher->wait_for_catchup('mysub1');
+
+# Also wait for initial table sync to finish.
+my $synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Also wait for initial table sync to finish.
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Specifying non-existent publication along with add publication.
+($ret, $stdout, $stderr) = $node_subscriber->psql(
+	'postgres',
+	"ALTER SUBSCRIPTION mysub1 ADD PUBLICATION non_existent_pub1, non_existent_pub2"
+);
+ok( $stderr =~
+	  m/WARNING:  publications "non_existent_pub1", "non_existent_pub2" do not exist in the publisher/,
+	"Alter subscription add publication throws warning for non-existent publications");
+
+# Specifying non-existent publication along with set publication.
+($ret, $stdout, $stderr) = $node_subscriber->psql('postgres',
+	"ALTER SUBSCRIPTION mysub1 SET PUBLICATION non_existent_pub"
+);
+ok( $stderr =~
+	  m/WARNING:  publication "non_existent_pub" does not exist in the publisher/,
+	"Alter subscription set publication throws warning for non-existent publication");
+
 $node_subscriber->stop;
 $node_publisher->stop;
 
-- 
2.32.0

Reply via email to