Comments on 0004-Add-logical-replication-workers-v16.patch.gz:

I didn't find any major problems.  At times while I was testing strange
things it was not clear why "nothing is happening".  I'll do some more
checking in that direction.

Fixup patch attached that enhances some error messages, fixes some
typos, and other minor changes.  See also comments below.

---

The way check_max_logical_replication_workers() is implemented creates
potential ordering dependencies in postgresql.conf.  For example,

max_logical_replication_workers = 100
max_worker_processes = 200

fails, but if you change the order, it works.  The existing
check_max_worker_processes() has the same problem, but I suspect because
it only checks against MAX_BACKENDS, nobody has ever seriously hit that
limit.

I suggest just removing the check.  If you set
max_logical_replication_workers higher than max_worker_processes and you
hit the lower limit, then whatever is controlling max_worker_processes
should complain with its own error message.

---

The default for max_logical_replication_workers is 4, which seems very
little.  Maybe it should be more like 10 or 20.  The "Quick setup"
section recommends changing it to 10.  We should at least be
consistent there: If you set a default value that is not 0, then it
should enough that we don't need to change it again in the Quick
setup.  (Maybe the default max_worker_processes should also be
raised?)

+max_logical_replication_workers = 10 # one per subscription + one per
instance needed on subscriber

I think this is incorrect (copied from max_worker_processes?).  The
launcher does not count as one of the workers here.

On a related note, should the minimum not be 0 instead of 1?

---

About the changes to libpqrcv_startstreaming().  The timeline is not
really an option in the syntax.  Just passing in a string that is
pasted in the final command creates too much coupling, I think.  I
would keep the old timeline (TimeLineID tli) argument, and make the
options const char * [], and let startstreaming() assemble the final
string, including commas and parentheses.  It's still not a perfect
abstraction, because you need to do the quoting yourself, but much
better.  (Alternatively, get rid of the startstreaming call and just
have callers use libpqrcv_PQexec directly.)

---

Some of the header files are named inconsistently with their .c files.
I think src/include/replication/logicalworker.h should be split into
logicalapply.h and logicallauncher.h.  Not sure about
worker_internal.h.  Maybe rename apply.c to worker.c?

(I'm also not fond of throwing publicationcmds.h and
subscriptioncmds.h together into replicationcmds.h.  Maybe that could
be changed, too.)

---

Various FATAL errors in logical/relation.c when the target relation is
not in the right state.  Could those not be ERRORs?  The behavior is
the same at the moment because background workers terminate on
uncaught exceptions, but that should eventually be improved.

A FATAL error will lead to a

LOG:  unexpected EOF on standby connection

on the publisher, because the process just dies without protocol
shutdown.  (And then it reconnects and tries again.  So we might as
well not die and just retry again.)

---

In LogicalRepRelMapEntry, rename rel to localrel, so it's clearer in
the code using this struct.  (Maybe reloid -> localreloid)

---

Partitioned tables are not supported in either publications or as
replication targets.  This is expected but should be fixed before the
final release.

---

In apply.c:

The comment in apply_handle_relation() makes a point that the schema
validation is done later, but does not tell why.  The answer is
probably because it doesn't matter and it's more convenient, but it
should be explained in the comment.

See XXX comment in logicalrep_worker_stop().

The get_flush_position() return value is not intuitive from the
function name.  Maybe make that another pointer argument for clarity.

reread_subscription() complains if the subscription name was changed.
I don't know why that is a problem.

---

In launcher.c:

pg_stat_get_subscription should hold LogicalRepWorkerLock around the
whole loop, so that it doesn't get inconsistent results when workers
change during the loop.

---

In relation.c:

Inconsistent use of uint32 vs LogicalRepRelId.  Pick one. :)

-- 
Peter Eisentraut              http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
From 30e7b9d70b5c6488dd74eb648c62372911096544 Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <pete...@gmx.net>
Date: Thu, 5 Jan 2017 12:00:00 -0500
Subject: [PATCH] fixup! Add logical replication workers

---
 doc/src/sgml/catalogs.sgml                 |  6 ++
 doc/src/sgml/ref/create_subscription.sgml  |  2 +-
 src/backend/executor/execReplication.c     |  6 +-
 src/backend/replication/logical/apply.c    | 88 ++++++++++++++++--------------
 src/backend/replication/logical/launcher.c | 34 ++++++------
 src/backend/replication/logical/relation.c | 42 ++++++--------
 src/include/replication/worker_internal.h  |  2 +-
 src/test/subscription/Makefile             |  4 +-
 8 files changed, 95 insertions(+), 89 deletions(-)

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 2aba25537b..85bea7e12a 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -6316,6 +6316,12 @@ <title><structname>pg_subscription</structname></title>
    database.
   </para>
 
+  <para>
+   Access to this catalog is restricted from normal users.  Normal users can
+   use the view <xref linkend="pg-stat-subscription"> to get some information
+   about subscriptions.
+  </para>
+
   <table>
    <title><structname>pg_subscription</structname> Columns</title>
 
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 5301abc4ef..40d08b3440 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -4,7 +4,7 @@
 -->
 
 <refentry id="SQL-CREATESUBSCRIPTION">
- <indexterm zone="sql-altersubscription">
+ <indexterm zone="sql-createsubscription">
   <primary>CREATE SUBSCRIPTION</primary>
  </indexterm>
 
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 897118f52a..259eb32a18 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -68,7 +68,6 @@ build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
 		RegProcedure regop;
 		int			pkattno = attoff + 1;
 		int			mainattno = indkey->values[attoff];
-		Oid			atttype = attnumTypeId(rel, mainattno);
 		Oid			optype = get_opclass_input_type(opclass->values[attoff]);
 
 		/*
@@ -82,9 +81,8 @@ build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
 									   BTEqualStrategyNumber);
 
 		if (!OidIsValid(operator))
-			elog(ERROR,
-				 "could not lookup equality operator for type %u, optype %u in opfamily %u",
-				 atttype, optype, opfamily);
+			elog(ERROR, "could not find member %d(%u,%u) of opfamily %u",
+				 BTEqualStrategyNumber, optype, optype, opfamily);
 
 		regop = get_opcode(operator);
 
diff --git a/src/backend/replication/logical/apply.c b/src/backend/replication/logical/apply.c
index 94859db320..905b4c9b36 100644
--- a/src/backend/replication/logical/apply.c
+++ b/src/backend/replication/logical/apply.c
@@ -254,10 +254,9 @@ slot_store_error_callback(void *arg)
 
 	remotetypoid = errarg->rel->atttyps[errarg->attnum];
 	localtypoid = logicalrep_typmap_getid(remotetypoid);
-	errcontext("processing remote data for replication target %s column %s, "
+	errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
 			   "remote type %s, local type %s",
-			   quote_qualified_identifier(errarg->rel->nspname,
-										  errarg->rel->relname),
+			   errarg->rel->nspname, errarg->rel->relname,
 			   errarg->rel->attnames[errarg->attnum],
 			   format_type_be(remotetypoid),
 			   format_type_be(localtypoid));
@@ -311,7 +310,7 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
 		else
 		{
 			/*
-			 * We assign NULL dropped attributes, NULL values and missing
+			 * We assign NULL to dropped attributes, NULL values, and missing
 			 * values (missing values should be later filled using
 			 * slot_fill_defaults).
 			 */
@@ -329,7 +328,7 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
 /*
  * Modify slot with user data provided as C strigs.
  * This is somewhat similar to heap_modify_tuple but also calls the type
- * input fuction on the user data as the input is the rext representation
+ * input fuction on the user data as the input is the text representation
  * of the types.
  */
 static void
@@ -445,14 +444,16 @@ apply_handle_origin(StringInfo s)
 	 * any actual writes.
 	 */
 	if (!in_remote_transaction || IsTransactionState())
-		elog(ERROR, "ORIGIN message sent out of order");
+		ereport(ERROR,
+				(errcode(ERRCODE_PROTOCOL_VIOLATION),
+				 errmsg("ORIGIN message sent out of order")));
 }
 
 /*
  * Handle RELATION message.
  *
  * Note we don't do validation against local schema here. The validation is
- * posponed until first change for given relation comes.
+ * postponed until first change for given relation comes. XXX WHY?
  */
 static void
 apply_handle_relation(StringInfo s)
@@ -479,7 +480,7 @@ apply_handle_type(StringInfo s)
 }
 
 /*
- * Get replicat identity index or if it is not defined a primary key.
+ * Get replica identity index or if it is not defined a primary key.
  *
  * If neither is defined, returns InvalidOid
  */
@@ -562,19 +563,17 @@ check_relation_updatable(LogicalRepRelMapEntry *rel)
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("publisher does not send replica identity column "
-						"expected by the logical replication target %s",
-						quote_qualified_identifier(rel->remoterel.nspname,
-												   rel->remoterel.relname))));
+						"expected by the logical replication target relation \"%s.%s\"",
+						rel->remoterel.nspname, rel->remoterel.relname)));
 	}
 
 	ereport(ERROR,
 			(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-			 errmsg("the logical replication target %s has "
+			 errmsg("logical replication target relation \"%s.%s\" has "
 					"neither REPLICA IDENTIY index nor PRIMARY "
 					"KEY and published relation does not have "
 					"REPLICA IDENTITY FULL",
-					quote_qualified_identifier(rel->remoterel.nspname,
-											   rel->remoterel.relname))));
+					rel->remoterel.nspname, rel->remoterel.relname)));
 }
 
 /*
@@ -665,12 +664,12 @@ apply_handle_update(StringInfo s)
 		/*
 		 * The tuple to be updated could not be found.
 		 *
-		 * TODO what to do here, change the loglevel to LOG perhaps?
+		 * TODO what to do here, change the log level to LOG perhaps?
 		 */
-		ereport(DEBUG1,
-				(errmsg("logical replication could not find row for update "
-						"in replication target %s",
-						RelationGetRelationName(rel->rel))));
+		elog(DEBUG1,
+			 "logical replication did not find row for update "
+			 "in replication target relation \"%s\"",
+			 RelationGetRelationName(rel->rel));
 	}
 
 	/* Cleanup. */
@@ -816,7 +815,9 @@ apply_dispatch(StringInfo s)
 			apply_handle_origin(s);
 			break;
 		default:
-			elog(ERROR, "unknown action of type %c", action);
+			ereport(ERROR,
+					(errcode(ERRCODE_PROTOCOL_VIOLATION),
+					 errmsg("invalid logical replication message type %c", action)));
 	}
 }
 
@@ -952,7 +953,8 @@ ApplyLoop(void)
 				}
 				else if (len < 0)
 				{
-					elog(NOTICE, "data stream from publisher has ended");
+					ereport(LOG,
+							(errmsg("data stream from publisher has ended")));
 					endofstream = true;
 					break;
 				}
@@ -1218,9 +1220,10 @@ reread_subscription(void)
 	 */
 	if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0)
 	{
-		elog(LOG, "logical replication worker for subscription %s will "
-			 "restart because the connection info was changed",
-			 MySubscription->name);
+		ereport(LOG,
+				(errmsg("logical replication worker for subscription \"%s\" will "
+						"restart because the connection information was changed",
+						MySubscription->name)));
 
 		walrcv_disconnect(wrconn);
 		proc_exit(0);
@@ -1232,9 +1235,10 @@ reread_subscription(void)
 	 */
 	if (!equal(newsub->publications, MySubscription->publications))
 	{
-		elog(LOG, "logical replication worker for subscription %s will "
-			 "restart because subscription's publications were changed",
-			 MySubscription->name);
+		ereport(LOG,
+				(errmsg("logical replication worker for subscription \"%s\" will "
+						"restart because subscription's publications were changed",
+						MySubscription->name)));
 
 		walrcv_disconnect(wrconn);
 		proc_exit(0);
@@ -1247,9 +1251,10 @@ reread_subscription(void)
 	 */
 	if (!newsub)
 	{
-		elog(LOG, "logical replication worker for subscription %s has "
-			 "stopped because the subscription was removed",
-			 MySubscription->name);
+		ereport(LOG,
+				(errmsg("logical replication worker for subscription \"%s\" will "
+						"stop because the subscription was removed",
+						MySubscription->name)));
 
 		walrcv_disconnect(wrconn);
 		proc_exit(0);
@@ -1262,9 +1267,10 @@ reread_subscription(void)
 	 */
 	if (!newsub->enabled)
 	{
-		elog(LOG, "logical replication worker for subscription %s has "
-			 "stopped because the subscription was disabled",
-			 MySubscription->name);
+		ereport(LOG,
+				(errmsg("logical replication worker for subscription \"%s\" will "
+						"stop because the subscription was disabled",
+						MySubscription->name)));
 
 		walrcv_disconnect(wrconn);
 		proc_exit(0);
@@ -1354,9 +1360,10 @@ ApplyWorkerMain(Datum main_arg)
 
 	if (!MySubscription->enabled)
 	{
-		elog(LOG, "logical replication worker for subscription %s will not "
-			 "start because the subscription was disabled during startup",
-			 MySubscription->name);
+		ereport(LOG,
+				(errmsg("logical replication worker for subscription \"%s\" will not "
+						"start because the subscription was disabled during startup",
+						MySubscription->name)));
 
 		proc_exit(0);
 	}
@@ -1366,8 +1373,9 @@ ApplyWorkerMain(Datum main_arg)
 								  subscription_change_cb,
 								  (Datum) 0);
 
-	elog(LOG, "logical replication apply for subscription %s started",
-		 MySubscription->name);
+	ereport(LOG,
+			(errmsg("logical replication apply for subscription \"%s\" has started",
+					MySubscription->name)));
 
 	/* Setup replication origin tracking. */
 	originid = replorigin_by_name(MySubscription->slotname, true);
@@ -1380,7 +1388,7 @@ ApplyWorkerMain(Datum main_arg)
 	CommitTransactionCommand();
 
 	/* Connect to the origin and start the replication. */
-	elog(DEBUG1, "connecting to publisher using connection string %s",
+	elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
 		 MySubscription->conninfo);
 	wrconn = walrcv_connect(MySubscription->conninfo, true,
 								MySubscription->name, &err);
@@ -1409,6 +1417,6 @@ ApplyWorkerMain(Datum main_arg)
 
 	walrcv_disconnect(wrconn);
 
-	/* We should only get here if we received sigTERM */
+	/* We should only get here if we received SIGTERM */
 	proc_exit(0);
 }
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 783d97e274..f977effbbc 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -127,9 +127,9 @@ get_subscription_list(void)
 		sub->dbid = subform->subdbid;
 		sub->owner = subform->subowner;
 		sub->enabled = subform->subenabled;
+		sub->name = pstrdup(NameStr(subform->subname));
 
-		/* We don't fill fields we are not intereste in. */
-		sub->name = NULL;
+		/* We don't fill fields we are not interested in. */
 		sub->conninfo = NULL;
 		sub->slotname = NULL;
 		sub->publications = NIL;
@@ -224,7 +224,7 @@ logicalrep_worker_find(Oid subid)
  * Start new apply background worker.
  */
 void
-logicalrep_worker_launch(Oid dbid, Oid subid, Oid userid)
+logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid)
 {
 	BackgroundWorker	bgw;
 	BackgroundWorkerHandle *bgw_handle;
@@ -232,8 +232,8 @@ logicalrep_worker_launch(Oid dbid, Oid subid, Oid userid)
 	LogicalRepWorker   *worker = NULL;
 
 	ereport(LOG,
-			(errmsg("starting logical replication worker for subscription %u",
-					subid)));
+			(errmsg("starting logical replication worker for subscription \"%s\"",
+					subname)));
 
 	/* Report this after the initial starting message for consistency. */
 	if (max_replication_slots == 0)
@@ -281,7 +281,7 @@ logicalrep_worker_launch(Oid dbid, Oid subid, Oid userid)
 	bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
 	bgw.bgw_main = ApplyWorkerMain;
 	snprintf(bgw.bgw_name, BGW_MAXLEN,
-			 "logical replication worker %u", subid);
+			 "logical replication worker for subscription %u", subid);
 
 	bgw.bgw_restart_time = BGW_NEVER_RESTART;
 	bgw.bgw_notify_pid = MyProcPid;
@@ -291,8 +291,8 @@ logicalrep_worker_launch(Oid dbid, Oid subid, Oid userid)
 	{
 		ereport(WARNING,
 				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
-				 errmsg("out of logical replication workers slots"),
-				 errhint("You might need to increase max_logical_replication_workers.")));
+				 errmsg("out of background workers slots"),
+				 errhint("You might need to increase max_worker_processes.")));
 		return;
 	}
 
@@ -304,15 +304,16 @@ logicalrep_worker_launch(Oid dbid, Oid subid, Oid userid)
  * Stop the logical replication worker and wait until it detaches from the
  * slot.
  *
- * Note it's caller's job to ensure that new workers are not being started
- * during this function call. That can be achieven by holding exclusive
- * lock on LogicalRepLauncherLock.
+ * The caller must hold LogicalRepLauncherLock to ensure that new workers are
+ * not being started during this function call.
  */
 void
 logicalrep_worker_stop(Oid subid)
 {
 	LogicalRepWorker *worker;
 
+	Assert(LWLockHeldByMe(LogicalRepLauncherLock));
+
 	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 
 	worker = logicalrep_worker_find(subid);
@@ -364,7 +365,7 @@ logicalrep_worker_stop(Oid subid)
 		int	rc;
 
 		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-		if (!logicalrep_worker_find(subid))
+		if (!logicalrep_worker_find(subid))  // XXX why not just wait for worker->proc == NULL?
 		{
 			LWLockRelease(LogicalRepWorkerLock);
 			break;
@@ -606,7 +607,7 @@ ApplyLauncherMain(Datum main_arg)
 
 				if (sub->enabled && w == NULL)
 				{
-					logicalrep_worker_launch(sub->dbid, sub->oid, sub->owner);
+					logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, sub->owner);
 					last_start_time = now;
 					wait_time = wal_retrieve_retry_interval;
 					/* Limit to one worker per mainloop cycle. */
@@ -662,7 +663,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
 {
 #define PG_STAT_GET_SUBSCRIPTION_COLS	7
 	Oid			subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
-	int			wid;
+	int			i;
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	TupleDesc	tupdesc;
 	Tuplestorestate *tupstore;
@@ -694,7 +695,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
 
 	MemoryContextSwitchTo(oldcontext);
 
-	for (wid = 0; wid <= max_logical_replication_workers; wid++)
+	for (i = 0; i <= max_logical_replication_workers; i++)
 	{
 		/* for each row */
 		Datum		values[PG_STAT_GET_SUBSCRIPTION_COLS];
@@ -703,7 +704,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
 		LogicalRepWorker	worker;
 
 		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-		memcpy(&worker, &LogicalRepCtx->workers[wid],
+		memcpy(&worker, &LogicalRepCtx->workers[i],
 			   sizeof(LogicalRepWorker));
 		if (!worker.proc || !IsBackendPid(worker.proc->pid))
 		{
@@ -724,7 +725,6 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
 		MemSet(values, 0, sizeof(values));
 		MemSet(nulls, 0, sizeof(nulls));
 
-		/* Values available to all callers */
 		values[0] = ObjectIdGetDatum(worker.subid);
 		values[1] = Int32GetDatum(worker_pid);
 		if (XLogRecPtrIsInvalid(worker.last_lsn))
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index b01cdbf744..80e639ae67 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -240,7 +240,7 @@ logicalrep_rel_open(uint32 remoteid, LOCKMODE lockmode)
 						HASH_FIND, &found);
 
 	if (!found)
-		elog(FATAL, "cache lookup failed for remote relation %u",
+		elog(ERROR, "no relation map entry for remote relation ID %u",
 			 remoteid);
 
 	/* Need to update the local cache? */
@@ -260,25 +260,23 @@ logicalrep_rel_open(uint32 remoteid, LOCKMODE lockmode)
 											  remoterel->relname, -1),
 								 lockmode, true);
 		if (!OidIsValid(relid))
-			ereport(FATAL,
+			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-					 errmsg("the logical replication target %s not found",
-							quote_qualified_identifier(remoterel->nspname,
-													   remoterel->relname))));
+					 errmsg("logical replication target relation \"%s.%s\" does not exist",
+							remoterel->nspname, remoterel->relname)));
 		entry->rel = heap_open(relid, NoLock);
 
 		/* We currently only support writing to regular tables. */
 		if (entry->rel->rd_rel->relkind != RELKIND_RELATION)
-			ereport(FATAL,
+			ereport(ERROR,
 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
-					 errmsg("the logical replication target %s is not a table",
-							quote_qualified_identifier(remoterel->nspname,
-													   remoterel->relname))));
+					 errmsg("logical replication target relation \"%s.%s\" is not a table",
+							remoterel->nspname, remoterel->relname)));
 
 		/*
-		 * Build the mapping of local attribute numbers to remote attrinute
+		 * Build the mapping of local attribute numbers to remote attribute
 		 * numbers and validate that we don't miss any replicated columns
-		 * as that would result in pontentially unwanted data loss.
+		 * as that would result in potentially unwanted data loss.
 		 */
 		desc = RelationGetDescr(entry->rel);
 		oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
@@ -293,17 +291,15 @@ logicalrep_rel_open(uint32 remoteid, LOCKMODE lockmode)
 			entry->attrmap[i] = attnum;
 			if (attnum >= 0)
 				found++;
-
 		}
 
 		/* TODO, detail message with names of missing columns */
 		if (found < remoterel->natts)
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-					 errmsg("the logical replication target %s is missing "
+					 errmsg("logical replication target relation \"%s.%s\" is missing "
 							"some replicated columns",
-							quote_qualified_identifier(remoterel->nspname,
-													   remoterel->relname))));
+							remoterel->nspname, remoterel->relname)));
 
 		/*
 		 * Check that replica identity matches. We allow for stricter replica
@@ -340,10 +336,9 @@ logicalrep_rel_open(uint32 remoteid, LOCKMODE lockmode)
 			if (!AttrNumberIsForUserDefinedAttr(attnum))
 				ereport(ERROR,
 						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-						 errmsg("the logical replication target %s uses "
+						 errmsg("logical replication target relation \"%s.%s\" uses "
 								"system columns in REPLICA IDENTITY index",
-							quote_qualified_identifier(remoterel->nspname,
-													   remoterel->relname))));
+								remoterel->nspname, remoterel->relname)));
 
 			attnum = AttrNumberGetAttrOffset(attnum);
 
@@ -450,7 +445,7 @@ logicalrep_typmap_getid(Oid remoteid)
 	if (remoteid < FirstNormalObjectId)
 	{
 		if (!get_typisdefined(remoteid))
-			ereport(FATAL,
+			ereport(ERROR,
 					(errmsg("builtin type %u not found", remoteid),
 					 errhint("This can be caused by having publisher with "
 							 "higher major version than subscriber")));
@@ -465,7 +460,7 @@ logicalrep_typmap_getid(Oid remoteid)
 						HASH_FIND, &found);
 
 	if (!found)
-		elog(FATAL, "cache lookup failed for remote type %u",
+		elog(ERROR, "no type map entry for remote type %u",
 			 remoteid);
 
 	/* Found and mapped, return the oid. */
@@ -482,11 +477,10 @@ logicalrep_typmap_getid(Oid remoteid)
 		entry->typoid = InvalidOid;
 
 	if (!OidIsValid(entry->typoid))
-		ereport(FATAL,
+		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-				 errmsg("type %s required for logical replication not found",
-						quote_qualified_identifier(entry->nspname,
-												   entry->typname))));
+				 errmsg("data type \"%s.%s\" required for logical replication does not exist",
+						entry->nspname, entry->typname)));
 
 	return entry->typoid;
 }
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 4eb5158264..fd843021a2 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -53,7 +53,7 @@ extern bool	got_SIGTERM;
 extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid);
 extern int logicalrep_worker_count(Oid subid);
-extern void logicalrep_worker_launch(Oid dbid, Oid subid, Oid userid);
+extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid);
 extern void logicalrep_worker_stop(Oid subid);
 extern void logicalrep_worker_wakeup(Oid subid);
 
diff --git a/src/test/subscription/Makefile b/src/test/subscription/Makefile
index cac5c9020d..bb9795453a 100644
--- a/src/test/subscription/Makefile
+++ b/src/test/subscription/Makefile
@@ -9,12 +9,12 @@
 #
 #-------------------------------------------------------------------------
 
-EXTRA_INSTALL = contrib/hstore
-
 subdir = src/test/subscription
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
+EXTRA_INSTALL = contrib/hstore
+
 check:
 	$(prove_check)
 
-- 
2.11.0

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to