On 2021-May-10, Peter Smith wrote:

> PSA v5 of the patch. It is the same as v4 but with the v4-0001 part
> omitted because that was already pushed.

I made a few whitespace adjustments on Friday that I didn't get time to
push, so I left the whole set to after the minors are finalized this
week.  I'll get them pushed on Wednesday or so.  (The back branches have
a few conflicts, on every release, but I see no reason to post those and
it'd upset the cfbot).

-- 
Álvaro Herrera       Valdivia, Chile
>From 8494316bef64cde7476146bce0bda9a93890def7 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.sm...@fujitsu.com>
Date: Fri, 7 May 2021 17:51:12 +1000
Subject: [PATCH v6] Rename the logical replication global wrconn.

The worker.c global wrconn is only meant to be used for logical apply/tablesync
workers, but there are other variables with the same name. To reduce future confusion
the global has renamed from "wrconn" to "LogRepWorkerWalRcvConn".
---
 src/backend/replication/logical/launcher.c  |  4 +--
 src/backend/replication/logical/tablesync.c | 35 ++++++++++++---------
 src/backend/replication/logical/worker.c    | 23 +++++++-------
 src/include/replication/worker_internal.h   |  2 +-
 4 files changed, 36 insertions(+), 28 deletions(-)

diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index cb462a052a..f97ab12675 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -649,8 +649,8 @@ static void
 logicalrep_worker_onexit(int code, Datum arg)
 {
 	/* Disconnect gracefully from the remote side. */
-	if (wrconn)
-		walrcv_disconnect(wrconn);
+	if (LogRepWorkerWalRcvConn)
+		walrcv_disconnect(LogRepWorkerWalRcvConn);
 
 	logicalrep_worker_detach();
 
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 0638f5c7f8..67f907cdd9 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -302,8 +302,11 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 								   MyLogicalRepWorker->relstate,
 								   MyLogicalRepWorker->relstate_lsn);
 
-		/* End wal streaming so wrconn can be re-used to drop the slot. */
-		walrcv_endstreaming(wrconn, &tli);
+		/*
+		 * End streaming so that LogRepWorkerWalRcvConn can be used to drop
+		 * the slot.
+		 */
+		walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
 
 		/*
 		 * Cleanup the tablesync slot.
@@ -322,7 +325,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 		 * otherwise, it won't be dropped till the corresponding subscription
 		 * is dropped. So passing missing_ok = false.
 		 */
-		ReplicationSlotDropAtPubNode(wrconn, syncslotname, false);
+		ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
 
 		finish_sync_worker();
 	}
@@ -642,7 +645,7 @@ copy_read_data(void *outbuf, int minread, int maxread)
 		for (;;)
 		{
 			/* Try read the data. */
-			len = walrcv_receive(wrconn, &buf, &fd);
+			len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
 
 			CHECK_FOR_INTERRUPTS();
 
@@ -715,7 +718,8 @@ fetch_remote_table_info(char *nspname, char *relname,
 					 "   AND c.relname = %s",
 					 quote_literal_cstr(nspname),
 					 quote_literal_cstr(relname));
-	res = walrcv_exec(wrconn, cmd.data, lengthof(tableRow), tableRow);
+	res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
+					  lengthof(tableRow), tableRow);
 
 	if (res->status != WALRCV_OK_TUPLES)
 		ereport(ERROR,
@@ -752,9 +756,11 @@ fetch_remote_table_info(char *nspname, char *relname,
 					 "   AND a.attrelid = %u"
 					 " ORDER BY a.attnum",
 					 lrel->remoteid,
-					 (walrcv_server_version(wrconn) >= 120000 ? "AND a.attgenerated = ''" : ""),
+					 (walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ?
+						 "AND a.attgenerated = ''" : ""),
 					 lrel->remoteid);
-	res = walrcv_exec(wrconn, cmd.data, lengthof(attrRow), attrRow);
+	res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
+					  lengthof(attrRow), attrRow);
 
 	if (res->status != WALRCV_OK_TUPLES)
 		ereport(ERROR,
@@ -841,7 +847,7 @@ copy_table(Relation rel)
 		appendStringInfo(&cmd, " FROM %s) TO STDOUT",
 						 quote_qualified_identifier(lrel.nspname, lrel.relname));
 	}
-	res = walrcv_exec(wrconn, cmd.data, 0, NULL);
+	res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
 	pfree(cmd.data);
 	if (res->status != WALRCV_OK_COPY_OUT)
 		ereport(ERROR,
@@ -957,8 +963,9 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	 * application_name, so that it is different from the main apply worker,
 	 * so that synchronous replication can distinguish them.
 	 */
-	wrconn = walrcv_connect(MySubscription->conninfo, true, slotname, &err);
-	if (wrconn == NULL)
+	LogRepWorkerWalRcvConn =
+		walrcv_connect(MySubscription->conninfo, true, slotname, &err);
+	if (LogRepWorkerWalRcvConn == NULL)
 		ereport(ERROR,
 				(errmsg("could not connect to the publisher: %s", err)));
 
@@ -985,7 +992,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 		 * breakdown then it wouldn't have succeeded so trying it next time
 		 * seems like a better bet.
 		 */
-		ReplicationSlotDropAtPubNode(wrconn, slotname, true);
+		ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, slotname, true);
 	}
 	else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
 	{
@@ -1038,7 +1045,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	 * ensures that both the replication slot we create (see below) and the
 	 * COPY are consistent with each other.
 	 */
-	res = walrcv_exec(wrconn,
+	res = walrcv_exec(LogRepWorkerWalRcvConn,
 					  "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
 					  0, NULL);
 	if (res->status != WALRCV_OK_COMMAND)
@@ -1058,7 +1065,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	 * slot leading to a dangling slot on the server.
 	 */
 	HOLD_INTERRUPTS();
-	walrcv_create_slot(wrconn, slotname, false /* permanent */ ,
+	walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, false /* permanent */ ,
 					   CRS_USE_SNAPSHOT, origin_startpos);
 	RESUME_INTERRUPTS();
 
@@ -1100,7 +1107,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	copy_table(rel);
 	PopActiveSnapshot();
 
-	res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
+	res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
 	if (res->status != WALRCV_OK_COMMAND)
 		ereport(ERROR,
 				(errmsg("table copy could not finish transaction on publisher: %s",
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index d9f157172b..1432554d5a 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -156,7 +156,7 @@ MemoryContext ApplyContext = NULL;
 /* per stream context for streaming transactions */
 static MemoryContext LogicalStreamingContext = NULL;
 
-WalReceiverConn *wrconn = NULL;
+WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
 
 Subscription *MySubscription = NULL;
 bool		MySubscriptionValid = false;
@@ -2126,7 +2126,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
 		MemoryContextSwitchTo(ApplyMessageContext);
 
-		len = walrcv_receive(wrconn, &buf, &fd);
+		len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
 
 		if (len != 0)
 		{
@@ -2206,7 +2206,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 					MemoryContextReset(ApplyMessageContext);
 				}
 
-				len = walrcv_receive(wrconn, &buf, &fd);
+				len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
 			}
 		}
 
@@ -2312,7 +2312,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 	}
 
 	/* All done */
-	walrcv_endstreaming(wrconn, &tli);
+	walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
 }
 
 /*
@@ -2396,7 +2396,8 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
 		 LSN_FORMAT_ARGS(writepos),
 		 LSN_FORMAT_ARGS(flushpos));
 
-	walrcv_send(wrconn, reply_message->data, reply_message->len);
+	walrcv_send(LogRepWorkerWalRcvConn,
+				reply_message->data, reply_message->len);
 
 	if (recvpos > last_recvpos)
 		last_recvpos = recvpos;
@@ -3090,9 +3091,9 @@ ApplyWorkerMain(Datum main_arg)
 		origin_startpos = replorigin_session_get_progress(false);
 		CommitTransactionCommand();
 
-		wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name,
-								&err);
-		if (wrconn == NULL)
+		LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
+												MySubscription->name, &err);
+		if (LogRepWorkerWalRcvConn == NULL)
 			ereport(ERROR,
 					(errmsg("could not connect to the publisher: %s", err)));
 
@@ -3100,7 +3101,7 @@ ApplyWorkerMain(Datum main_arg)
 		 * We don't really use the output identify_system for anything but it
 		 * does some initializations on the upstream so let's still call it.
 		 */
-		(void) walrcv_identify_system(wrconn, &startpointTLI);
+		(void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
 	}
 
 	/*
@@ -3116,14 +3117,14 @@ ApplyWorkerMain(Datum main_arg)
 	options.startpoint = origin_startpos;
 	options.slotname = myslotname;
 	options.proto.logical.proto_version =
-		walrcv_server_version(wrconn) >= 140000 ?
+		walrcv_server_version(LogRepWorkerWalRcvConn) >= 140000 ?
 		LOGICALREP_PROTO_STREAM_VERSION_NUM : LOGICALREP_PROTO_VERSION_NUM;
 	options.proto.logical.publication_names = MySubscription->publications;
 	options.proto.logical.binary = MySubscription->binary;
 	options.proto.logical.streaming = MySubscription->stream;
 
 	/* Start normal logical streaming replication. */
-	walrcv_startstreaming(wrconn, &options);
+	walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
 
 	/* Run the main loop. */
 	LogicalRepApplyLoop(origin_startpos);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 1cac75e5a9..179eb43900 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -62,7 +62,7 @@ typedef struct LogicalRepWorker
 extern MemoryContext ApplyContext;
 
 /* libpqreceiver connection */
-extern struct WalReceiverConn *wrconn;
+extern struct WalReceiverConn *LogRepWorkerWalRcvConn;
 
 /* Worker and subscription objects. */
 extern Subscription *MySubscription;
-- 
2.20.1

Reply via email to