>From d242be3dc95443f226a906f108c52d32b9ac3e1e Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Fri, 5 May 2017 18:14:00 +0200
Subject: [PATCH] Remove the NODROP SLOT option from DROP SUBSCRIPTION

---
 doc/src/sgml/ref/create_subscription.sgml          |   7 +
 doc/src/sgml/ref/drop_subscription.sgml            |  22 +--
 src/backend/catalog/pg_subscription.c              |   9 +-
 src/backend/commands/subscriptioncmds.c            | 195 ++++++++++++++++-----
 src/backend/nodes/copyfuncs.c                      |   2 +-
 src/backend/nodes/equalfuncs.c                     |   2 +-
 src/backend/parser/gram.y                          |  43 ++---
 src/backend/replication/logical/worker.c           |  35 ++--
 src/backend/tcop/utility.c                         |   3 +
 src/include/catalog/pg_subscription.h              |   2 +-
 src/include/nodes/parsenodes.h                     |   2 +-
 .../dummy_seclabel/expected/dummy_seclabel.out     |   4 +-
 .../modules/dummy_seclabel/sql/dummy_seclabel.sql  |   4 +-
 src/test/regress/expected/object_address.out       |   4 +-
 src/test/regress/expected/subscription.out         |  13 +-
 src/test/regress/sql/object_address.sql            |   4 +-
 src/test/regress/sql/subscription.sql              |  12 +-
 src/test/subscription/t/001_rep_changes.pl         |   2 +-
 src/test/subscription/t/004_sync.pl                |   8 +-
 19 files changed, 241 insertions(+), 132 deletions(-)

diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index fcec254..c9256e7 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -129,6 +129,13 @@ CREATE SUBSCRIPTION <replaceable class="PARAMETER">subscription_name</replaceabl
           Name of the replication slot to use. The default behavior is to
           use <literal>subscription_name</> for slot name.
          </para>
+         <para>
+          When <literal>slot_name</literal> is set to
+          <literal>NONE</literal> there will be no slot associated with the
+          subscription. Such subscriptions must also have both
+          <literal>enabled</literal> and <literal>create_slot</literal> set
+          to <literal>false</literal>.
+         </para>
         </listitem>
        </varlistentry>
 
diff --git a/doc/src/sgml/ref/drop_subscription.sgml b/doc/src/sgml/ref/drop_subscription.sgml
index f1ac125..8b00c45 100644
--- a/doc/src/sgml/ref/drop_subscription.sgml
+++ b/doc/src/sgml/ref/drop_subscription.sgml
@@ -21,7 +21,7 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
-DROP SUBSCRIPTION [ IF EXISTS ] <replaceable class="parameter">name</replaceable> [ DROP SLOT | NODROP SLOT ]
+DROP SUBSCRIPTION [ IF EXISTS ] <replaceable class="parameter">name</replaceable> [ CASCADE | RESTRICT ]
 </synopsis>
  </refsynopsisdiv>
 
@@ -57,20 +57,16 @@ DROP SUBSCRIPTION [ IF EXISTS ] <replaceable class="parameter">name</replaceable
    </varlistentry>
 
    <varlistentry>
-    <term><literal>DROP SLOT</literal></term>
-    <term><literal>NODROP SLOT</literal></term>
-    <listitem>
-     <para>
-      Specifies whether to drop the replication slot on the publisher.  The
-      default is
-      <literal>DROP SLOT</literal>.
-     </para>
+    <term><literal>CASCADE</literal></term>
+    <term><literal>RESTRICT</literal></term>
 
+    <listitem>
      <para>
-      If the publisher is not reachable when the subscription is to be
-      dropped, then it is useful to specify <literal>NODROP SLOT</literal>.
-      But the replication slot on the publisher will then have to be removed
-      manually.
+      These key words are used to determine what to do with when there is a
+      replication slot associated with the subscription. The
+      <literal>RESTRICT</literal> will refuse to drop the subscription in
+      such case, while <literal>CASCADE</literal> will drop the associated
+      slot. <literal>RESTRICT</literal> is the default.
      </para>
     </listitem>
    </varlistentry>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 22587a4..7dc21f1 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -82,8 +82,10 @@ GetSubscription(Oid subid, bool missing_ok)
 							tup,
 							Anum_pg_subscription_subslotname,
 							&isnull);
-	Assert(!isnull);
-	sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
+	if (!isnull)
+		sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
+	else
+		sub->slotname = NULL;
 
 	/* Get synccommit */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
@@ -147,7 +149,8 @@ FreeSubscription(Subscription *sub)
 {
 	pfree(sub->name);
 	pfree(sub->conninfo);
-	pfree(sub->slotname);
+	if (sub->slotname)
+		pfree(sub->slotname);
 	list_free_deep(sub->publications);
 	pfree(sub);
 }
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index c00981e..afb116a 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -60,7 +60,8 @@ static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
  */
 static void
 parse_subscription_options(List *options, bool *connect, bool *enabled_given,
-						   bool *enabled, bool *create_slot, char **slot_name,
+						   bool *enabled, bool *create_slot,
+						   bool *slot_name_given, char **slot_name,
 						   bool *copy_data, char **synchronous_commit)
 {
 	ListCell   *lc;
@@ -78,7 +79,10 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
 	if (create_slot)
 		*create_slot = true;
 	if (slot_name)
+	{
+		*slot_name_given = false;
 		*slot_name = NULL;
+	}
 	if (copy_data)
 		*copy_data = true;
 	if (synchronous_commit)
@@ -121,12 +125,17 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
 		}
 		else if (strcmp(defel->defname, "slot_name") == 0 && slot_name)
 		{
-			if (*slot_name)
+			if (*slot_name_given)
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
 						 errmsg("conflicting or redundant options")));
 
+			*slot_name_given = true;
 			*slot_name = defGetString(defel);
+
+			/* Setting slot_name = NONE is treated as no slot name. */
+			if (strcmp(*slot_name, "none") == 0)
+				*slot_name = NULL;
 		}
 		else if (strcmp(defel->defname, "copy_data") == 0 && copy_data)
 		{
@@ -164,26 +173,43 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
 	if (connect && !*connect)
 	{
 		/* Check for incompatible options from the user. */
-		if (*enabled_given && *enabled)
+		if (enabled && *enabled_given && *enabled)
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
-					 errmsg("noconnect and enabled are mutually exclusive options")));
+					 errmsg("connect = false and enabled are mutually exclusive options")));
 
-		if (create_slot_given && *create_slot)
+		if (create_slot && create_slot_given && *create_slot)
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
-					 errmsg("noconnect and create slot are mutually exclusive options")));
+					 errmsg("connect = false and create_slot are mutually exclusive options")));
 
-		if (copy_data_given && *copy_data)
+		if (copy_data && copy_data_given && *copy_data)
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
-					 errmsg("noconnect and copy data are mutually exclusive options")));
+					 errmsg("connect = false and copy_data are mutually exclusive options")));
 
 		/* Change the defaults of other options. */
 		*enabled = false;
 		*create_slot = false;
 		*copy_data = false;
 	}
+
+	/*
+	 * Do additional checking for disallowed combination when
+	 * slot_name = NONE was used.
+	 */
+	if (slot_name && *slot_name_given && !*slot_name)
+	{
+		if (enabled && *enabled_given && *enabled)
+			ereport(ERROR,
+					(errcode(ERRCODE_SYNTAX_ERROR),
+					 errmsg("slot_name = NONE and enabled are mutually exclusive options")));
+
+		if (create_slot && create_slot_given && *create_slot)
+			ereport(ERROR,
+					(errcode(ERRCODE_SYNTAX_ERROR),
+					 errmsg("slot_name = NONE and create_slot are mutually exclusive options")));
+	}
 }
 
 /*
@@ -260,6 +286,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	char	   *synchronous_commit;
 	char	   *conninfo;
 	char	   *slotname;
+	bool		slotname_given;
 	char		originname[NAMEDATALEN];
 	bool		create_slot;
 	List	   *publications;
@@ -269,8 +296,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	 * Connection and publication should not be specified here.
 	 */
 	parse_subscription_options(stmt->options, &connect, &enabled_given,
-							   &enabled, &create_slot, &slotname, &copy_data,
-							   &synchronous_commit);
+							   &enabled, &create_slot, &slotname_given,
+							   &slotname, &copy_data, &synchronous_commit);
 
 	/*
 	 * Since creating a replication slot is not transactional, rolling back
@@ -299,8 +326,9 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 						stmt->subname)));
 	}
 
-	if (slotname == NULL)
+	if (!slotname_given && slotname == NULL)
 		slotname = stmt->subname;
+
 	/* The default for synchronous_commit of subscriptions is off. */
 	if (synchronous_commit == NULL)
 		synchronous_commit = "off";
@@ -325,8 +353,11 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
 	values[Anum_pg_subscription_subconninfo - 1] =
 		CStringGetTextDatum(conninfo);
-	values[Anum_pg_subscription_subslotname - 1] =
-		DirectFunctionCall1(namein, CStringGetDatum(slotname));
+	if (slotname)
+		values[Anum_pg_subscription_subslotname - 1] =
+			DirectFunctionCall1(namein, CStringGetDatum(slotname));
+	else
+		nulls[Anum_pg_subscription_subslotname - 1] = true;
 	values[Anum_pg_subscription_subsynccommit - 1] =
 		CStringGetTextDatum(synchronous_commit);
 	values[Anum_pg_subscription_subpublications - 1] =
@@ -396,6 +427,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 			 */
 			if (create_slot)
 			{
+				Assert(slotname);
+
 				walrcv_create_slot(wrconn, slotname, false,
 								   CRS_NOEXPORT_SNAPSHOT, &lsn);
 				ereport(NOTICE,
@@ -548,6 +581,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 	HeapTuple	tup;
 	Oid			subid;
 	bool		update_tuple = false;
+	Subscription   *sub;
 
 	rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
 
@@ -567,6 +601,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 					   stmt->subname);
 
 	subid = HeapTupleGetOid(tup);
+	sub = GetSubscription(subid, false);
 
 	/* Form a new tuple. */
 	memset(values, 0, sizeof(values));
@@ -577,19 +612,29 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 	{
 		case ALTER_SUBSCRIPTION_OPTIONS:
 			{
-				char *slot_name;
-				char	   *synchronous_commit;
+				char   *slotname;
+				bool	slotname_given;
+				char   *synchronous_commit;
 
 				parse_subscription_options(stmt->options, NULL, NULL, NULL,
-										   NULL, &slot_name, NULL,
-										   &synchronous_commit);
+										   NULL, &slotname_given, &slotname,
+										   NULL, &synchronous_commit);
 
-				if (slot_name)
+				if (slotname_given)
 				{
-					values[Anum_pg_subscription_subslotname - 1] =
-						DirectFunctionCall1(namein, CStringGetDatum(slot_name));
+					if (sub->enabled && !slotname)
+						ereport(ERROR,
+								(errcode(ERRCODE_SYNTAX_ERROR),
+								 errmsg("cannot set slot_name = NONE for enabled subscription")));
+
+					if (slotname)
+						values[Anum_pg_subscription_subslotname - 1] =
+						DirectFunctionCall1(namein, CStringGetDatum(slotname));
+					else
+						nulls[Anum_pg_subscription_subslotname - 1] = true;
 					replaces[Anum_pg_subscription_subslotname - 1] = true;
 				}
+
 				if (synchronous_commit)
 				{
 					values[Anum_pg_subscription_subsynccommit - 1] =
@@ -608,9 +653,14 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 
 				parse_subscription_options(stmt->options, NULL,
 										   &enabled_given, &enabled, NULL,
-										   NULL, NULL, NULL);
+										   NULL, NULL, NULL, NULL);
 				Assert(enabled_given);
 
+				if (!sub->slotname && enabled)
+					ereport(ERROR,
+							(errcode(ERRCODE_SYNTAX_ERROR),
+							 errmsg("cannot enable subscription which does not have a slot_name")));
+
 				values[Anum_pg_subscription_subenabled - 1] =
 					BoolGetDatum(enabled);
 				replaces[Anum_pg_subscription_subenabled - 1] = true;
@@ -633,10 +683,10 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 		case ALTER_SUBSCRIPTION_PUBLICATION_REFRESH:
 			{
 				bool			copy_data;
-				Subscription   *sub = GetSubscription(subid, false);
 
 				parse_subscription_options(stmt->options, NULL, NULL, NULL,
-										   NULL, NULL, &copy_data, NULL);
+										   NULL, NULL, NULL, &copy_data,
+										   NULL);
 
 				values[Anum_pg_subscription_subpublications - 1] =
 					 publicationListToArray(stmt->publication);
@@ -647,6 +697,11 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 				/* Refresh if user asked us to. */
 				if (stmt->kind == ALTER_SUBSCRIPTION_PUBLICATION_REFRESH)
 				{
+					if (!sub->enabled)
+						ereport(ERROR,
+								(errcode(ERRCODE_SYNTAX_ERROR),
+								 errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
+
 					/* Make sure refresh sees the new list of publications. */
 					sub->publications = stmt->publication;
 
@@ -659,10 +714,15 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 		case ALTER_SUBSCRIPTION_REFRESH:
 			{
 				bool			copy_data;
-				Subscription   *sub = GetSubscription(subid, false);
+
+				if (!sub->enabled)
+					ereport(ERROR,
+							(errcode(ERRCODE_SYNTAX_ERROR),
+							 errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
 
 				parse_subscription_options(stmt->options, NULL, NULL, NULL,
-										   NULL, NULL, &copy_data, NULL);
+										   NULL, NULL, NULL, &copy_data,
+										   NULL);
 
 				AlterSubscription_refresh(sub, copy_data);
 
@@ -721,8 +781,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	 * run DROP SUBSCRIPTION inside a transaction block if dropping the
 	 * replication slot.
 	 */
-	if (stmt->drop_slot)
-		PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION ... DROP SLOT");
+	if (stmt->behavior == DROP_CASCADE)
+		PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION ... CASCADE");
 
 	/*
 	 * Lock pg_subscription with AccessExclusiveLock to ensure
@@ -782,8 +842,10 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	/* Get slotname */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
 							Anum_pg_subscription_subslotname, &isnull);
-	Assert(!isnull);
-	slotname = pstrdup(NameStr(*DatumGetName(datum)));
+	if (!isnull)
+		slotname = pstrdup(NameStr(*DatumGetName(datum)));
+	else
+		slotname = NULL;
 
 	ObjectAddressSet(myself, SubscriptionRelationId, subid);
 	EventTriggerSQLDropAddObject(&myself, true, true);
@@ -808,43 +870,84 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	if (originid != InvalidRepOriginId)
 		replorigin_drop(originid);
 
-	/* If the user asked to not drop the slot, we are done mow.*/
-	if (!stmt->drop_slot)
+	/* If there is no slot associated with subscription we can finish here. */
+	if (!slotname)
 	{
 		heap_close(rel, NoLock);
 		return;
 	}
 
 	/*
-	 * Otherwise drop the replication slot at the publisher node using
+	 * Otherwise check for the replication slot at the publisher node using
 	 * the replication connection.
 	 */
 	load_file("libpqwalreceiver", false);
 
-	initStringInfo(&cmd);
-	appendStringInfo(&cmd, "DROP_REPLICATION_SLOT \"%s\"", slotname);
-
 	wrconn = walrcv_connect(conninfo, true, subname, &err);
 	if (wrconn == NULL)
 		ereport(ERROR,
-				(errmsg("could not connect to publisher when attempting to "
-						"drop the replication slot \"%s\"", slotname),
-				 errdetail("The error was: %s", err)));
+				(errmsg("could not connect to publisher when attempting to fetch "
+						"information about replication slot \"%s\"", slotname),
+				 errdetail("The error was: %s", err),
+				 errhint("Use ALTER SUBSCRIPTION ... WITH (slot_name = NONE) "
+						 "to disassociate the subscription from slot.")));
 
 	PG_TRY();
 	{
 		WalRcvExecResult   *res;
-		res = walrcv_exec(wrconn, cmd.data, 0, NULL);
-
-		if (res->status != WALRCV_OK_COMMAND)
+		TupleTableSlot	   *tupslot;
+		Oid					slot_row_desc[1] = {BOOLOID};
+		bool				found;
+
+		initStringInfo(&cmd);
+		appendStringInfo(&cmd,
+						 "SELECT true FROM pg_catalog.pg_replication_slots WHERE slot_name = %s",
+						 quote_literal_cstr(slotname));
+		res = walrcv_exec(wrconn, cmd.data, 1, slot_row_desc);
+		if (res->status != WALRCV_OK_TUPLES)
 			ereport(ERROR,
-					(errmsg("could not drop the replication slot \"%s\" on publisher",
-							slotname),
+					(errmsg("could not fetch the replication slot info from publisher"),
 					 errdetail("The error was: %s", res->err)));
+
+		tupslot = MakeSingleTupleTableSlot(res->tupledesc);
+		found = tuplestore_gettupleslot(res->tuplestore, true, false,
+										tupslot);
+		ExecDropSingleTupleTableSlot(tupslot);
+		walrcv_clear_result(res);
+
+		/* If slot was not found on publisher, we are done. */
+		if (!found)
+		{
+			walrcv_disconnect(wrconn);
+			pfree(cmd.data);
+			heap_close(rel, NoLock);
+			return;
+		}
+
+		/* Otherwise the next action depends on the drop_behavior. */
+		if (stmt->behavior == DROP_CASCADE)
+		{
+			resetStringInfo(&cmd);
+			appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s",
+							 quote_identifier(slotname));
+			res = walrcv_exec(wrconn, cmd.data, 0, NULL);
+
+			if (res->status != WALRCV_OK_COMMAND)
+				ereport(ERROR,
+						(errmsg("could not drop the replication slot \"%s\" on publisher",
+								slotname),
+						 errdetail("The error was: %s", res->err)));
+			else
+				ereport(NOTICE,
+						(errmsg("dropped replication slot \"%s\" on publisher",
+								slotname)));
+		}
 		else
-			ereport(NOTICE,
-					(errmsg("dropped replication slot \"%s\" on publisher",
-							slotname)));
+			ereport(ERROR,
+					(errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
+					 errmsg("cannot drop subscription \"%s\" because there is still replication slot associated with it",
+							subname),
+					 errhint("Use DROP ... CASCADE to drop the slot too.")));
 
 		walrcv_clear_result(res);
 	}
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 35a237a..2d2a9d0 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -4537,8 +4537,8 @@ _copyDropSubscriptionStmt(const DropSubscriptionStmt *from)
 	DropSubscriptionStmt *newnode = makeNode(DropSubscriptionStmt);
 
 	COPY_STRING_FIELD(subname);
-	COPY_SCALAR_FIELD(drop_slot);
 	COPY_SCALAR_FIELD(missing_ok);
+	COPY_SCALAR_FIELD(behavior);
 
 	return newnode;
 }
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 21dfbb0..b5459cd 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -2246,8 +2246,8 @@ _equalDropSubscriptionStmt(const DropSubscriptionStmt *a,
 						   const DropSubscriptionStmt *b)
 {
 	COMPARE_STRING_FIELD(subname);
-	COMPARE_SCALAR_FIELD(drop_slot);
 	COMPARE_SCALAR_FIELD(missing_ok);
+	COMPARE_SCALAR_FIELD(behavior);
 
 	return true;
 }
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 732647b..bdd2f6d 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -415,7 +415,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 %type <fun_param_mode> arg_class
 %type <typnam>	func_return func_type
 
-%type <boolean>  opt_trusted opt_restart_seqs opt_drop_slot
+%type <boolean>  opt_trusted opt_restart_seqs
 %type <ival>	 OptTemp
 %type <ival>	 OptNoLog
 %type <oncommit> OnCommitOption
@@ -467,7 +467,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 %type <node>	def_arg columnElem where_clause where_or_current_clause
 				a_expr b_expr c_expr AexprConst indirection_el opt_slice_bound
 				columnref in_expr having_clause func_table xmltable array_expr
-				ExclusionWhereClause
+				ExclusionWhereClause operator_def_arg
 %type <list>	rowsfrom_item rowsfrom_list opt_col_def_list
 %type <boolean> opt_ordinality
 %type <list>	ExclusionConstraintList ExclusionConstraintElem
@@ -5684,6 +5684,7 @@ def_arg:	func_type						{ $$ = (Node *)$1; }
 			| qual_all_Op					{ $$ = (Node *)$1; }
 			| NumericOnly					{ $$ = (Node *)$1; }
 			| Sconst						{ $$ = (Node *)makeString($1); }
+			| NONE							{ $$ = (Node *)makeString(pstrdup($1)); }
 		;
 
 old_aggr_definition: '(' old_aggr_list ')'			{ $$ = $2; }
@@ -8923,8 +8924,16 @@ operator_def_list:	operator_def_elem								{ $$ = list_make1($1); }
 
 operator_def_elem: ColLabel '=' NONE
 						{ $$ = makeDefElem($1, NULL, @1); }
-				   | ColLabel '=' def_arg
-					   { $$ = makeDefElem($1, (Node *) $3, @1); }
+				   | ColLabel '='operator_def_arg
+						{ $$ = makeDefElem($1, (Node *) $3, @1); }
+		;
+
+operator_def_arg:
+			func_type						{ $$ = (Node *)$1; }
+			| reserved_keyword				{ $$ = (Node *)makeString(pstrdup($1)); }
+			| qual_all_Op					{ $$ = (Node *)$1; }
+			| NumericOnly					{ $$ = (Node *)$1; }
+			| Sconst						{ $$ = (Node *)makeString($1); }
 		;
 
 /*****************************************************************************
@@ -9315,42 +9324,24 @@ AlterSubscriptionStmt:
  *
  *****************************************************************************/
 
-DropSubscriptionStmt: DROP SUBSCRIPTION name opt_drop_slot
+DropSubscriptionStmt: DROP SUBSCRIPTION name opt_drop_behavior
 				{
 					DropSubscriptionStmt *n = makeNode(DropSubscriptionStmt);
 					n->subname = $3;
-					n->drop_slot = $4;
 					n->missing_ok = false;
+					n->behavior = $4;
 					$$ = (Node *) n;
 				}
-				|  DROP SUBSCRIPTION IF_P EXISTS name opt_drop_slot
+				|  DROP SUBSCRIPTION IF_P EXISTS name opt_drop_behavior
 				{
 					DropSubscriptionStmt *n = makeNode(DropSubscriptionStmt);
 					n->subname = $5;
-					n->drop_slot = $6;
 					n->missing_ok = true;
+					n->behavior = $6;
 					$$ = (Node *) n;
 				}
 		;
 
-opt_drop_slot:
-			DROP SLOT
-				{
-					$$ = TRUE;
-				}
-			| IDENT SLOT
-				{
-					if (strcmp($1, "nodrop") == 0)
-						$$ = FALSE;
-					else
-						ereport(ERROR,
-								(errcode(ERRCODE_SYNTAX_ERROR),
-								 errmsg("unrecognized option \"%s\"", $1),
-										parser_errposition(@1)));
-				}
-			| /*EMPTY*/								{ $$ = TRUE; }
-		;
-
 /*****************************************************************************
  *
  *		QUERY:	Define Rewrite Rule
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 2d7770d..a4b7dcb 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1329,6 +1329,22 @@ reread_subscription(void)
 	}
 
 	/*
+	 * Exit if the subscription was disabled.
+	 * This normally should not happen as the worker gets killed
+	 * during ALTER SUBSCRIPTION ... DISABLE.
+	 */
+	if (!newsub->enabled)
+	{
+		ereport(LOG,
+				(errmsg("logical replication worker for subscription \"%s\" will "
+						"stop because the subscription was disabled",
+						MySubscription->name)));
+
+		walrcv_disconnect(wrconn);
+		proc_exit(0);
+	}
+
+	/*
 	 * Exit if connection string was changed. The launcher will start
 	 * new worker.
 	 */
@@ -1358,6 +1374,9 @@ reread_subscription(void)
 		proc_exit(0);
 	}
 
+	/* !slotname should never happen when enabled is true. */
+	Assert(newsub->slotname);
+
 	/*
 	 * We need to make new connection to new slot if slot name has changed
 	 * so exit here as well if that's the case.
@@ -1388,22 +1407,6 @@ reread_subscription(void)
 		proc_exit(0);
 	}
 
-	/*
-	 * Exit if the subscription was disabled.
-	 * This normally should not happen as the worker gets killed
-	 * during ALTER SUBSCRIPTION ... DISABLE.
-	 */
-	if (!newsub->enabled)
-	{
-		ereport(LOG,
-				(errmsg("logical replication worker for subscription \"%s\" will "
-						"stop because the subscription was disabled",
-						MySubscription->name)));
-
-		walrcv_disconnect(wrconn);
-		proc_exit(0);
-	}
-
 	/* Check for other changes that should never happen too. */
 	if (newsub->dbid != MySubscription->dbid)
 	{
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 24e5c42..d4fa5a7 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -2303,6 +2303,9 @@ CreateCommandTag(Node *parsetree)
 				case OBJECT_PUBLICATION:
 					tag = "DROP PUBLICATION";
 					break;
+				case OBJECT_SUBSCRIPTION:
+					tag = "DROP SUBSCRIPTION";
+					break;
 				case OBJECT_STATISTIC_EXT:
 					tag = "DROP STATISTICS";
 					break;
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 5550f19..d4f3979 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -45,7 +45,7 @@ CATALOG(pg_subscription,6100) BKI_SHARED_RELATION BKI_ROWTYPE_OID(6101) BKI_SCHE
 	text		subconninfo BKI_FORCE_NOT_NULL;
 
 	/* Slot name on publisher */
-	NameData	subslotname BKI_FORCE_NOT_NULL;
+	NameData	subslotname;
 
 	/* Synchronous commit setting for worker */
 	text		subsynccommit BKI_FORCE_NOT_NULL;
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index e1d454a..46c23c2 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3393,8 +3393,8 @@ typedef struct DropSubscriptionStmt
 {
 	NodeTag		type;
 	char	   *subname;		/* Name of of the subscription */
-	bool		drop_slot;		/* Should we drop the slot on remote side? */
 	bool		missing_ok;		/* Skip error if missing? */
+	DropBehavior behavior;		/* RESTRICT or CASCADE behavior */
 } DropSubscriptionStmt;
 
 #endif   /* PARSENODES_H */
diff --git a/src/test/modules/dummy_seclabel/expected/dummy_seclabel.out b/src/test/modules/dummy_seclabel/expected/dummy_seclabel.out
index f6fc29a..77bdc93 100644
--- a/src/test/modules/dummy_seclabel/expected/dummy_seclabel.out
+++ b/src/test/modules/dummy_seclabel/expected/dummy_seclabel.out
@@ -69,7 +69,7 @@ CREATE SCHEMA dummy_seclabel_test;
 SECURITY LABEL ON SCHEMA dummy_seclabel_test IS 'unclassified';		-- OK
 SET client_min_messages = error;
 CREATE PUBLICATION dummy_pub;
-CREATE SUBSCRIPTION dummy_sub CONNECTION '' PUBLICATION foo WITH (connect = false);
+CREATE SUBSCRIPTION dummy_sub CONNECTION '' PUBLICATION foo WITH (connect = false, slot_name = NONE);
 RESET client_min_messages;
 SECURITY LABEL ON PUBLICATION dummy_pub IS 'classified';
 SECURITY LABEL ON SUBSCRIPTION dummy_sub IS 'classified';
@@ -111,7 +111,7 @@ NOTICE:  event ddl_command_end: SECURITY LABEL
 DROP EVENT TRIGGER always_start, always_end, always_drop, always_rewrite;
 DROP VIEW dummy_seclabel_view1;
 DROP TABLE dummy_seclabel_tbl1, dummy_seclabel_tbl2;
-DROP SUBSCRIPTION dummy_sub NODROP SLOT;
+DROP SUBSCRIPTION dummy_sub;
 DROP PUBLICATION dummy_pub;
 DROP ROLE regress_dummy_seclabel_user1;
 DROP ROLE regress_dummy_seclabel_user2;
diff --git a/src/test/modules/dummy_seclabel/sql/dummy_seclabel.sql b/src/test/modules/dummy_seclabel/sql/dummy_seclabel.sql
index d7795bd..8c347b6 100644
--- a/src/test/modules/dummy_seclabel/sql/dummy_seclabel.sql
+++ b/src/test/modules/dummy_seclabel/sql/dummy_seclabel.sql
@@ -73,7 +73,7 @@ SECURITY LABEL ON SCHEMA dummy_seclabel_test IS 'unclassified';		-- OK
 
 SET client_min_messages = error;
 CREATE PUBLICATION dummy_pub;
-CREATE SUBSCRIPTION dummy_sub CONNECTION '' PUBLICATION foo WITH (connect = false);
+CREATE SUBSCRIPTION dummy_sub CONNECTION '' PUBLICATION foo WITH (connect = false, slot_name = NONE);
 RESET client_min_messages;
 SECURITY LABEL ON PUBLICATION dummy_pub IS 'classified';
 SECURITY LABEL ON SUBSCRIPTION dummy_sub IS 'classified';
@@ -108,7 +108,7 @@ DROP EVENT TRIGGER always_start, always_end, always_drop, always_rewrite;
 DROP VIEW dummy_seclabel_view1;
 DROP TABLE dummy_seclabel_tbl1, dummy_seclabel_tbl2;
 
-DROP SUBSCRIPTION dummy_sub NODROP SLOT;
+DROP SUBSCRIPTION dummy_sub;
 DROP PUBLICATION dummy_pub;
 
 DROP ROLE regress_dummy_seclabel_user1;
diff --git a/src/test/regress/expected/object_address.out b/src/test/regress/expected/object_address.out
index 0849715..700f261 100644
--- a/src/test/regress/expected/object_address.out
+++ b/src/test/regress/expected/object_address.out
@@ -37,7 +37,7 @@ CREATE TRANSFORM FOR int LANGUAGE SQL (
 	FROM SQL WITH FUNCTION varchar_transform(internal),
 	TO SQL WITH FUNCTION int4recv(internal));
 CREATE PUBLICATION addr_pub FOR TABLE addr_nsp.gentable;
-CREATE SUBSCRIPTION addr_sub CONNECTION '' PUBLICATION bar WITH (connect = false);
+CREATE SUBSCRIPTION addr_sub CONNECTION '' PUBLICATION bar WITH (connect = false, slot_name = NONE);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 CREATE STATISTICS addr_nsp.gentable_stat ON (a,b) FROM addr_nsp.gentable;
 -- test some error cases
@@ -477,7 +477,7 @@ SELECT (pg_identify_object(addr1.classid, addr1.objid, addr1.objsubid)).*,
 SET client_min_messages TO 'warning';
 DROP FOREIGN DATA WRAPPER addr_fdw CASCADE;
 DROP PUBLICATION addr_pub;
-DROP SUBSCRIPTION addr_sub NODROP SLOT;
+DROP SUBSCRIPTION addr_sub;
 DROP SCHEMA addr_nsp CASCADE;
 DROP OWNED BY regress_addr_user;
 DROP USER regress_addr_user;
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 33d53de..fcf5646 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -109,17 +109,18 @@ HINT:  The owner of a subscription must be a superuser.
 ALTER ROLE regress_subscription_user2 SUPERUSER;
 -- now it works
 ALTER SUBSCRIPTION testsub OWNER TO regress_subscription_user2;
--- fail - cannot do DROP SUBSCRIPTION DROP SLOT inside transaction block
+ALTER SUBSCRIPTION testsub WITH (slot_name = NONE);
+-- fail - cannot do DROP SUBSCRIPTION CASCADE inside transaction block
 BEGIN;
-DROP SUBSCRIPTION testsub DROP SLOT;
-ERROR:  DROP SUBSCRIPTION ... DROP SLOT cannot run inside a transaction block
+DROP SUBSCRIPTION testsub CASCADE;
+ERROR:  DROP SUBSCRIPTION ... CASCADE cannot run inside a transaction block
 COMMIT;
 BEGIN;
-DROP SUBSCRIPTION testsub NODROP SLOT;
+DROP SUBSCRIPTION testsub;
 COMMIT;
-DROP SUBSCRIPTION IF EXISTS testsub NODROP SLOT;
+DROP SUBSCRIPTION IF EXISTS testsub;
 NOTICE:  subscription "testsub" does not exist, skipping
-DROP SUBSCRIPTION testsub NODROP SLOT;  -- fail
+DROP SUBSCRIPTION testsub;  -- fail
 ERROR:  subscription "testsub" does not exist
 RESET SESSION AUTHORIZATION;
 DROP ROLE regress_subscription_user;
diff --git a/src/test/regress/sql/object_address.sql b/src/test/regress/sql/object_address.sql
index c698a63..8a738e2 100644
--- a/src/test/regress/sql/object_address.sql
+++ b/src/test/regress/sql/object_address.sql
@@ -40,7 +40,7 @@ CREATE TRANSFORM FOR int LANGUAGE SQL (
 	FROM SQL WITH FUNCTION varchar_transform(internal),
 	TO SQL WITH FUNCTION int4recv(internal));
 CREATE PUBLICATION addr_pub FOR TABLE addr_nsp.gentable;
-CREATE SUBSCRIPTION addr_sub CONNECTION '' PUBLICATION bar WITH (connect = false);
+CREATE SUBSCRIPTION addr_sub CONNECTION '' PUBLICATION bar WITH (connect = false, slot_name = NONE);
 CREATE STATISTICS addr_nsp.gentable_stat ON (a,b) FROM addr_nsp.gentable;
 
 -- test some error cases
@@ -205,7 +205,7 @@ SET client_min_messages TO 'warning';
 
 DROP FOREIGN DATA WRAPPER addr_fdw CASCADE;
 DROP PUBLICATION addr_pub;
-DROP SUBSCRIPTION addr_sub NODROP SLOT;
+DROP SUBSCRIPTION addr_sub;
 
 DROP SCHEMA addr_nsp CASCADE;
 
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 5454420..c27fee5 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -80,17 +80,19 @@ ALTER ROLE regress_subscription_user2 SUPERUSER;
 -- now it works
 ALTER SUBSCRIPTION testsub OWNER TO regress_subscription_user2;
 
--- fail - cannot do DROP SUBSCRIPTION DROP SLOT inside transaction block
+ALTER SUBSCRIPTION testsub WITH (slot_name = NONE);
+
+-- fail - cannot do DROP SUBSCRIPTION CASCADE inside transaction block
 BEGIN;
-DROP SUBSCRIPTION testsub DROP SLOT;
+DROP SUBSCRIPTION testsub CASCADE;
 COMMIT;
 
 BEGIN;
-DROP SUBSCRIPTION testsub NODROP SLOT;
+DROP SUBSCRIPTION testsub;
 COMMIT;
 
-DROP SUBSCRIPTION IF EXISTS testsub NODROP SLOT;
-DROP SUBSCRIPTION testsub NODROP SLOT;  -- fail
+DROP SUBSCRIPTION IF EXISTS testsub;
+DROP SUBSCRIPTION testsub;  -- fail
 
 RESET SESSION AUTHORIZATION;
 DROP ROLE regress_subscription_user;
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index b4c5539..be55279 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -191,7 +191,7 @@ $node_publisher->poll_query_until('postgres',
   or die "Timed out while waiting for apply to restart";
 
 # check all the cleanup
-$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_renamed DROP SLOT");
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_renamed CASCADE");
 
 $result =
   $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_subscription");
diff --git a/src/test/subscription/t/004_sync.pl b/src/test/subscription/t/004_sync.pl
index 8ece7dd..779c668 100644
--- a/src/test/subscription/t/004_sync.pl
+++ b/src/test/subscription/t/004_sync.pl
@@ -52,7 +52,7 @@ my $result =
 is($result, qq(10), 'initial data synced for first sub');
 
 # drop subscription so that there is unreplicated data
-$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub CASCADE");
 
 $node_publisher->safe_psql('postgres',
 	"INSERT INTO tab_rep SELECT generate_series(11,20)");
@@ -89,8 +89,8 @@ $node_subscriber->poll_query_until('postgres', "SELECT pid IS NOT NULL FROM pg_s
   or die "Timed out while waiting for subscriber to start";
 
 # and drop both subscriptions
-$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
-$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub2");
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub CASCADE");
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub2 CASCADE");
 
 # check subscriptions are removed
 $result =
@@ -154,7 +154,7 @@ $result =
   $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep_next");
 is($result, qq(20), 'changes for table added after subscription initialized replicated');
 
-$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub CASCADE");
 
 $node_subscriber->stop('fast');
 $node_publisher->stop('fast');
-- 
2.7.4

