On Fri, May 29, 2020 at 06:09:06PM +0900, Masahiko Sawada wrote:
> Yes. Conversely, if we start logical replication in a physical
> replication connection (i.g. replication=true) we got an error before
> staring replication:
> 
> ERROR:  logical decoding requires a database connection
> 
> I think we can prevent that SEGV in a similar way.

Still unconvinced as this restriction stands for logical decoding
requiring a database connection but it is not necessarily true now as
physical replication has less restrictions than a logical one.

Looking at the code, I think that there is some confusion with the
fake WAL reader used as base reference in InitWalSender() where we
assume that it could only be used in the context of a non-database WAL
sender.  However, this initialization happens when the WAL sender
connection is initialized, and what I think this misses is that we 
should try to initialize a WAL reader when actually going through a
START_REPLICATION command.

I can note as well that StartLogicalReplication() moves in this sense
by setting xlogreader to be the one from logical_decoding_ctx once the
decoding context has been created.

This results in the attached.  The extra test from upthread to check
that logical decoding is not allowed in a non-database WAL sender is a
good idea, so I have kept it.
--
Michael
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index d930fe957d..b0f2a6ed43 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -262,10 +262,6 @@ extern XLogReaderRoutine *LocalXLogReaderRoutine(void);
 /* Free an XLogReader */
 extern void XLogReaderFree(XLogReaderState *state);
 
-/* Initialize supporting structures */
-extern void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
-							   int segsize, const char *waldir);
-
 /* Position the XLogReader to given record */
 extern void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr);
 #ifdef FRONTEND
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 5995798b58..cb76be4f46 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -44,6 +44,8 @@ static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
 static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record,
 							XLogRecPtr recptr);
 static void ResetDecoder(XLogReaderState *state);
+static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
+							   int segsize, const char *waldir);
 
 /* size of the buffer allocated for error message. */
 #define MAX_ERRORMSG_LEN 1000
@@ -210,7 +212,7 @@ allocate_recordbuf(XLogReaderState *state, uint32 reclength)
 /*
  * Initialize the passed segment structs.
  */
-void
+static void
 WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
 				   int segsize, const char *waldir)
 {
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 86847cbb54..aeda307c6b 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -130,13 +130,11 @@ bool		log_replication_commands = false;
 bool		wake_wal_senders = false;
 
 /*
- * Physical walsender does not use xlogreader to read WAL, but it does use a
- * fake one to keep state.  Logical walsender uses a proper xlogreader.  Both
- * keep the 'xlogreader' pointer to the right one, for the sake of common
- * routines.
+ * xlogreader used for replication.  Note that a WAL sender doing physical
+ * replication does not need xlogreader to read WAL, but it needs one to
+ * keep a state of its work.
  */
-static XLogReaderState fake_xlogreader;
-static XLogReaderState *xlogreader;
+static XLogReaderState *xlogreader = NULL;
 
 /*
  * These variables keep track of the state of the timeline we're currently
@@ -285,20 +283,6 @@ InitWalSender(void)
 
 	/* Initialize empty timestamp buffer for lag tracking. */
 	lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker));
-
-	/*
-	 * Prepare physical walsender's fake xlogreader struct.  Logical walsender
-	 * does this later.
-	 */
-	if (!am_db_walsender)
-	{
-		xlogreader = &fake_xlogreader;
-		xlogreader->routine =
-			*XL_ROUTINE(.segment_open = WalSndSegmentOpen,
-						.segment_close = wal_segment_close);
-		WALOpenSegmentInit(&xlogreader->seg, &xlogreader->segcxt,
-						   wal_segment_size, NULL);
-	}
 }
 
 /*
@@ -594,6 +578,18 @@ StartReplication(StartReplicationCmd *cmd)
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
 
+	/* create xlogreader for physical replication */
+	xlogreader =
+		XLogReaderAllocate(wal_segment_size, NULL,
+						   XL_ROUTINE(.segment_open = WalSndSegmentOpen,
+									  .segment_close = wal_segment_close),
+						   NULL);
+
+	if (!xlogreader)
+		ereport(ERROR,
+				(errcode(ERRCODE_OUT_OF_MEMORY),
+				 errmsg("out of memory")));
+
 	/*
 	 * We assume here that we're logging enough information in the WAL for
 	 * log-shipping, since this is checked in PostmasterMain().
@@ -1643,6 +1639,8 @@ exec_replication_command(const char *cmd_string)
 					StartReplication(cmd);
 				else
 					StartLogicalReplication(cmd);
+
+				Assert(xlogreader != NULL);
 				break;
 			}
 
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
index ee05535b1c..78229a7b92 100644
--- a/src/test/recovery/t/006_logical_decoding.pl
+++ b/src/test/recovery/t/006_logical_decoding.pl
@@ -7,7 +7,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 13;
+use Test::More tests => 14;
 use Config;
 
 # Initialize master node
@@ -36,6 +36,15 @@ ok( $stderr =~
 	  m/replication slot "test_slot" was not created in this database/,
 	"Logical decoding correctly fails to start");
 
+# Check case of walsender not using a database connection.  Logical
+# decoding should not be allowed.
+($result, $stdout, $stderr) = $node_master->psql(
+	'template1',
+	qq[START_REPLICATION SLOT s1 LOGICAL 0/1],
+	replication => 'true');
+ok($stderr =~ /ERROR:  logical decoding requires a database connection/,
+	"Logical decoding fails on non-database connection");
+
 $node_master->safe_psql('postgres',
 	qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,10) s;]
 );

Attachment: signature.asc
Description: PGP signature

Reply via email to