>From a25ce7a17a4b2e814b74e5b6c845652498e6c2d3 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Thu, 11 May 2017 16:07:17 +0200
Subject: [PATCH] Check relkind of tables in CREATE/ALTER SUBSCRIPTION

We used to only check supported relkind on subscriber during the
replication, which is needed to ensure setup is valid and we don't
crash. However it's also useful to tell user immediately when CREATE or
ALTER SUBSCRIPTION is executed, that relation being added to
subscription is not of supported relkind.
---
 src/backend/commands/subscriptioncmds.c    | 11 +++++++++++
 src/backend/executor/execReplication.c     | 20 ++++++++++++++++++++
 src/backend/replication/logical/relation.c | 13 ++++---------
 src/include/executor/executor.h            |  2 ++
 4 files changed, 37 insertions(+), 9 deletions(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index b76cdc5..49c2cef 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -33,6 +33,8 @@
 #include "commands/event_trigger.h"
 #include "commands/subscriptioncmds.h"
 
+#include "executor/executor.h"
+
 #include "nodes/makefuncs.h"
 
 #include "replication/logicallauncher.h"
@@ -443,6 +445,10 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 
 				relid = RangeVarGetRelid(rv, AccessShareLock, false);
 
+				/* Check for supported relkind. */
+				CheckSubscriptionRelkind(get_rel_relkind(relid),
+										 rv->schemaname, rv->relname);
+
 				SetSubscriptionRelState(subid, relid, table_state,
 										InvalidXLogRecPtr);
 			}
@@ -555,6 +561,11 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 		Oid			relid;
 
 		relid = RangeVarGetRelid(rv, AccessShareLock, false);
+
+		/* Check for supported relkind. */
+		CheckSubscriptionRelkind(get_rel_relkind(relid),
+								 rv->schemaname, rv->relname);
+
 		pubrel_local_oids[off++] = relid;
 
 		if (!bsearch(&relid, subrel_local_oids,
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 327a0ba..6af8018 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -551,3 +551,23 @@ CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
 						RelationGetRelationName(rel)),
 				 errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
 }
+
+
+/*
+ * Check if we support writing into specific relkind.
+ *
+ * The nspname and relname are only needed for error reporting.
+ */
+void
+CheckSubscriptionRelkind(char relkind, const char *nspname,
+						 const char *relname)
+{
+	/*
+	 * We currently only support writing to regular tables.
+	 */
+	if (relkind != RELKIND_RELATION)
+		ereport(ERROR,
+				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+				 errmsg("logical replication target relation \"%s.%s\" is not a table",
+						nspname, relname)));
+}
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index 7c93bfb..590355a 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -20,6 +20,7 @@
 #include "access/sysattr.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_subscription_rel.h"
+#include "executor/executor.h"
 #include "nodes/makefuncs.h"
 #include "replication/logicalrelation.h"
 #include "replication/worker_internal.h"
@@ -258,15 +259,9 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
 							remoterel->nspname, remoterel->relname)));
 		entry->localrel = heap_open(relid, NoLock);
 
-		/*
-		 * We currently only support writing to regular and partitioned
-		 * tables.
-		 */
-		if (entry->localrel->rd_rel->relkind != RELKIND_RELATION)
-			ereport(ERROR,
-					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
-					 errmsg("logical replication target relation \"%s.%s\" is not a table",
-							remoterel->nspname, remoterel->relname)));
+		/* Check for supported relkind. */
+		CheckSubscriptionRelkind(entry->localrel->rd_rel->relkind,
+								 remoterel->nspname, remoterel->relname);
 
 		/*
 		 * Build the mapping of local attribute numbers to remote attribute
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 3107cf5..daae7aa 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -534,5 +534,7 @@ extern void ExecSimpleRelationDelete(EState *estate, EPQState *epqstate,
 						 TupleTableSlot *searchslot);
 extern void CheckCmdReplicaIdentity(Relation rel, CmdType cmd);
 
+extern void CheckSubscriptionRelkind(char relkind, const char *nspname,
+									 const char *relname);
 
 #endif   /* EXECUTOR_H  */
-- 
2.7.4

