Hi,

On 2013-09-03 11:40:57 -0400, Robert Haas wrote:
> On Fri, Aug 30, 2013 at 11:19 AM, Andres Freund <and...@2ndquadrant.com> 
> wrote:
> > 0005 wal_decoding: Log xl_running_xact's at a higher frequency than 
> > checkpoints are done
> > * benefits hot standby startup

I tried to update the patch to address the comments you made.

> > 0003 wal_decoding: Allow walsender's to connect to a specific database
> > * biggest problem is how to specify the connection we connect
> >   to. Currently with the patch walsender connects to a database if it's
> >   not named "replication" (via dbname). Perhaps it's better to invent a
> >   replication_dbname parameter?

I've updated the patch so it extends the "replication" startup parameter
to not only specify a boolean but also "database". In the latter case it
will connect to the database specified in "dbname".
As discussed downthread, this patch doesn't have an immediate advantage
for users until the changeset extraction patch itself is
applied. Whether or not it should be applied separately is unclear.

Greetings,

Andres Freund

-- 
 Andres Freund                     http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
>From 2aa39548f5990e9663e95f011f25a89a0dc8d8a1 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Mon, 19 Aug 2013 13:24:30 +0200
Subject: [PATCH 2/9] wal_decoding: Allow walsender's to connect to a specific
 database

Extend the existing 'replication' parameter to not only allow a boolean value
but also "database". If the latter is specified we connect to the database
specified in 'dbname'.

This is useful for future walsender commands which need database interaction,
e.g. changeset extraction.
---
 doc/src/sgml/protocol.sgml                         | 24 +++++++++---
 src/backend/postmaster/postmaster.c                | 23 ++++++++++--
 .../libpqwalreceiver/libpqwalreceiver.c            |  4 +-
 src/backend/replication/walsender.c                | 43 +++++++++++++++++++---
 src/backend/utils/init/postinit.c                  |  5 +++
 src/bin/pg_basebackup/pg_basebackup.c              |  4 +-
 src/bin/pg_basebackup/pg_receivexlog.c             |  4 +-
 src/bin/pg_basebackup/receivelog.c                 |  4 +-
 src/include/replication/walsender.h                |  1 +
 9 files changed, 89 insertions(+), 23 deletions(-)

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 0b2e60e..2ea14e5 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1301,10 +1301,13 @@
 
 <para>
 To initiate streaming replication, the frontend sends the
-<literal>replication</> parameter in the startup message. This tells the
-backend to go into walsender mode, wherein a small set of replication commands
-can be issued instead of SQL statements. Only the simple query protocol can be
-used in walsender mode.
+<literal>replication</> parameter in the startup message. A boolean value
+of <literal>true</> tells the backend to go into walsender mode, wherein a
+small set of replication commands can be issued instead of SQL statements. Only
+the simple query protocol can be used in walsender mode.
+Passing a <literal>database</> as the value instructs walsender to connect to
+the database specified in the <literal>dbname</> paramter which will in future
+allow some additional commands to the ones specified below to be run.
 
 The commands accepted in walsender mode are:
 
@@ -1314,7 +1317,7 @@ The commands accepted in walsender mode are:
     <listitem>
      <para>
       Requests the server to identify itself. Server replies with a result
-      set of a single row, containing three fields:
+      set of a single row, containing four fields:
      </para>
 
      <para>
@@ -1356,6 +1359,17 @@ The commands accepted in walsender mode are:
       </listitem>
       </varlistentry>
 
+      <varlistentry>
+      <term>
+       dbname
+      </term>
+      <listitem>
+      <para>
+       Database connected to or NULL.
+      </para>
+      </listitem>
+      </varlistentry>
+
       </variablelist>
      </para>
     </listitem>
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 01d2618..a31b01d 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -1894,10 +1894,21 @@ retry1:
 				port->cmdline_options = pstrdup(valptr);
 			else if (strcmp(nameptr, "replication") == 0)
 			{
-				if (!parse_bool(valptr, &am_walsender))
+				/*
+				 * Due to backward compatibility concerns replication is a
+				 * bybrid beast which allows the value to be either a boolean
+				 * or the string 'database'. The latter connects to a specific
+				 * database which is e.g. required for changeset extraction.
+				 */
+				if (strcmp(valptr, "database") == 0)
+				{
+					am_walsender = true;
+					am_db_walsender = true;
+				}
+				else if (!parse_bool(valptr, &am_walsender))
 					ereport(FATAL,
 							(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-							 errmsg("invalid value for boolean option \"replication\"")));
+							 errmsg("invalid value for option \"replication\", legal values are false, 0, true, 1 or database")));
 			}
 			else
 			{
@@ -1983,8 +1994,12 @@ retry1:
 	if (strlen(port->user_name) >= NAMEDATALEN)
 		port->user_name[NAMEDATALEN - 1] = '\0';
 
-	/* Walsender is not related to a particular database */
-	if (am_walsender)
+	/*
+	 * Generic walsender, e.g. for streaming replication, is not connected to a
+	 * particular database. But walsenders used for logical replication need to
+	 * connect to a specific database.
+	 */
+	if (am_walsender && !am_db_walsender)
 		port->database_name[0] = '\0';
 
 	/*
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 6bc0aa1..ee0f1fe 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -130,7 +130,7 @@ libpqrcv_identify_system(TimeLineID *primary_tli)
 						"the primary server: %s",
 						PQerrorMessage(streamConn))));
 	}
-	if (PQnfields(res) != 3 || PQntuples(res) != 1)
+	if (PQnfields(res) != 4 || PQntuples(res) != 1)
 	{
 		int			ntuples = PQntuples(res);
 		int			nfields = PQnfields(res);
@@ -138,7 +138,7 @@ libpqrcv_identify_system(TimeLineID *primary_tli)
 		PQclear(res);
 		ereport(ERROR,
 				(errmsg("invalid response from primary server"),
-				 errdetail("Expected 1 tuple with 3 fields, got %d tuples with %d fields.",
+				 errdetail("Expected 1 tuple with 4 fields, got %d tuples with %d fields.",
 						   ntuples, nfields)));
 	}
 	primary_sysid = PQgetvalue(res, 0, 0);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index afd559d..b00a91a 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -46,7 +46,10 @@
 #include "access/timeline.h"
 #include "access/transam.h"
 #include "access/xlog_internal.h"
+#include "access/xact.h"
+
 #include "catalog/pg_type.h"
+#include "commands/dbcommands.h"
 #include "funcapi.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
@@ -89,9 +92,10 @@ WalSndCtlData *WalSndCtl = NULL;
 WalSnd	   *MyWalSnd = NULL;
 
 /* Global state */
-bool		am_walsender = false;		/* Am I a walsender process ? */
+bool		am_walsender = false;		/* Am I a walsender process? */
 bool		am_cascading_walsender = false;		/* Am I cascading WAL to
-												 * another standby ? */
+												 * another standby? */
+bool		am_db_walsender = false;		/* connect to database? */
 
 /* User-settable parameters for walsender */
 int			max_wal_senders = 0;	/* the maximum number of concurrent walsenders */
@@ -243,10 +247,12 @@ IdentifySystem(void)
 	char		tli[11];
 	char		xpos[MAXFNAMELEN];
 	XLogRecPtr	logptr;
+	char*        dbname = NULL;
 
 	/*
-	 * Reply with a result set with one row, three columns. First col is
-	 * system ID, second is timeline ID, and third is current xlog location.
+	 * Reply with a result set with one row, four columns. First col is system
+	 * ID, second is timeline ID, third is current xlog location and the fourth
+	 * contains the database name if we are connected to one.
 	 */
 
 	snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
@@ -265,9 +271,23 @@ IdentifySystem(void)
 
 	snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
 
+	if (MyDatabaseId != InvalidOid)
+	{
+		MemoryContext cur = CurrentMemoryContext;
+
+		/* syscache access needs a transaction env. */
+		StartTransactionCommand();
+		/* make dbname live outside TX context */
+		MemoryContextSwitchTo(cur);
+		dbname = get_database_name(MyDatabaseId);
+		CommitTransactionCommand();
+		/* CommitTransactionCommand switches to TopMemoryContext */
+		MemoryContextSwitchTo(cur);
+	}
+
 	/* Send a RowDescription message */
 	pq_beginmessage(&buf, 'T');
-	pq_sendint(&buf, 3, 2);		/* 3 fields */
+	pq_sendint(&buf, 4, 2);		/* 4 fields */
 
 	/* first field */
 	pq_sendstring(&buf, "systemid");	/* col name */
@@ -295,17 +315,28 @@ IdentifySystem(void)
 	pq_sendint(&buf, -1, 2);
 	pq_sendint(&buf, 0, 4);
 	pq_sendint(&buf, 0, 2);
+
+	/* fourth field */
+	pq_sendstring(&buf, "dbname");
+	pq_sendint(&buf, 0, 4);
+	pq_sendint(&buf, 0, 2);
+	pq_sendint(&buf, TEXTOID, 4);
+	pq_sendint(&buf, -1, 2);
+	pq_sendint(&buf, 0, 4);
+	pq_sendint(&buf, 0, 2);
 	pq_endmessage(&buf);
 
 	/* Send a DataRow message */
 	pq_beginmessage(&buf, 'D');
-	pq_sendint(&buf, 3, 2);		/* # of columns */
+	pq_sendint(&buf, 4, 2);		/* # of columns */
 	pq_sendint(&buf, strlen(sysid), 4); /* col1 len */
 	pq_sendbytes(&buf, (char *) &sysid, strlen(sysid));
 	pq_sendint(&buf, strlen(tli), 4);	/* col2 len */
 	pq_sendbytes(&buf, (char *) tli, strlen(tli));
 	pq_sendint(&buf, strlen(xpos), 4);	/* col3 len */
 	pq_sendbytes(&buf, (char *) xpos, strlen(xpos));
+	pq_sendint(&buf, strlen(dbname), 4);	/* col4 len */
+	pq_sendbytes(&buf, (char *) dbname, strlen(dbname));
 
 	pq_endmessage(&buf);
 }
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index 2c7f0f1..56c352c 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -725,7 +725,12 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
 			ereport(FATAL,
 					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
 					 errmsg("must be superuser or replication role to start walsender")));
+	}
 
+	if (am_walsender &&
+	    (in_dbname == NULL || in_dbname[0] == '\0') &&
+	    dboid == InvalidOid)
+	{
 		/* process any options passed in the startup packet */
 		if (MyProcPort != NULL)
 			process_startup_options(MyProcPort, am_superuser);
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index a1e12a8..89e2376 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -1361,11 +1361,11 @@ BaseBackup(void)
 				progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
 		disconnect_and_exit(1);
 	}
-	if (PQntuples(res) != 1 || PQnfields(res) != 3)
+	if (PQntuples(res) != 1 || PQnfields(res) != 4)
 	{
 		fprintf(stderr,
 				_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"),
-				progname, PQntuples(res), PQnfields(res), 1, 3);
+				progname, PQntuples(res), PQnfields(res), 1, 4);
 		disconnect_and_exit(1);
 	}
 	sysidentifier = pg_strdup(PQgetvalue(res, 0, 0));
diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c
index 787a395..fe8aef6 100644
--- a/src/bin/pg_basebackup/pg_receivexlog.c
+++ b/src/bin/pg_basebackup/pg_receivexlog.c
@@ -252,11 +252,11 @@ StreamLog(void)
 				progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
 		disconnect_and_exit(1);
 	}
-	if (PQntuples(res) != 1 || PQnfields(res) != 3)
+	if (PQntuples(res) != 1 || PQnfields(res) != 4)
 	{
 		fprintf(stderr,
 				_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"),
-				progname, PQntuples(res), PQnfields(res), 1, 3);
+				progname, PQntuples(res), PQnfields(res), 1, 4);
 		disconnect_and_exit(1);
 	}
 	servertli = atoi(PQgetvalue(res, 0, 1));
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index d56a4d7..22a5340 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -534,11 +534,11 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 			PQclear(res);
 			return false;
 		}
-		if (PQnfields(res) != 3 || PQntuples(res) != 1)
+		if (PQnfields(res) != 4 || PQntuples(res) != 1)
 		{
 			fprintf(stderr,
 					_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"),
-					progname, PQntuples(res), PQnfields(res), 1, 3);
+					progname, PQntuples(res), PQnfields(res), 1, 4);
 			PQclear(res);
 			return false;
 		}
diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
index 2cc7ddf..5097235 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -19,6 +19,7 @@
 /* global state */
 extern bool am_walsender;
 extern bool am_cascading_walsender;
+extern bool am_db_walsender;
 extern bool wake_wal_senders;
 
 /* user-settable parameters */
-- 
1.8.3.251.g1462b67

>From 770c858ebebe229bb5239c8370fe25f51df0f2a6 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Mon, 19 Aug 2013 13:24:30 +0200
Subject: [PATCH 3/9] wal_decoding: Log xl_running_xact's at a higher frequency
 than checkpoints are done

Logging information about running xacts more frequently is beneficial for both,
hot standby which can reach consistency faster and release some resources
earlier using this information, and future logical replication which can
initialize quicker using this.

Do so in the background writer which seems to be the best choice as its
regularly running and shouldn't be busy for too long without getting back into
its main loop.

Also mark xl_running_xact records as being relevant for async commit so the wal
writer writes them out soonish instead of possibly waiting a long time.
---
 src/backend/postmaster/bgwriter.c | 62 +++++++++++++++++++++++++++++++++++++++
 src/backend/storage/ipc/standby.c | 27 ++++++++++++++---
 src/include/storage/standby.h     |  2 +-
 3 files changed, 86 insertions(+), 5 deletions(-)

diff --git a/src/backend/postmaster/bgwriter.c b/src/backend/postmaster/bgwriter.c
index 286ae86..13d57c5 100644
--- a/src/backend/postmaster/bgwriter.c
+++ b/src/backend/postmaster/bgwriter.c
@@ -54,9 +54,11 @@
 #include "storage/shmem.h"
 #include "storage/smgr.h"
 #include "storage/spin.h"
+#include "storage/standby.h"
 #include "utils/guc.h"
 #include "utils/memutils.h"
 #include "utils/resowner.h"
+#include "utils/timestamp.h"
 
 
 /*
@@ -71,6 +73,20 @@ int			BgWriterDelay = 200;
 #define HIBERNATE_FACTOR			50
 
 /*
+ * Interval in which standby snapshots are logged into the WAL stream, in
+ * milliseconds.
+ */
+#define LOG_SNAPSHOT_INTERVAL_MS 15000
+
+/*
+ * LSN and timestamp at which we last issued a LogStandbySnapshot(), to avoid
+ * doing so too often or repeatedly if there has been no other write activity
+ * in the system.
+ */
+static TimestampTz last_snapshot_ts;
+static XLogRecPtr last_snapshot_lsn = InvalidXLogRecPtr;
+
+/*
  * Flags set by interrupt handlers for later service in the main loop.
  */
 static volatile sig_atomic_t got_SIGHUP = false;
@@ -142,6 +158,12 @@ BackgroundWriterMain(void)
 	CurrentResourceOwner = ResourceOwnerCreate(NULL, "Background Writer");
 
 	/*
+	 * We just started, assume there has been either a shutdown or
+	 * end-of-recovery snapshot.
+	 */
+	last_snapshot_ts = GetCurrentTimestamp();
+
+	/*
 	 * Create a memory context that we will do all our work in.  We do this so
 	 * that we can reset the context during error recovery and thereby avoid
 	 * possible memory leaks.  Formerly this code just ran in
@@ -276,6 +298,46 @@ BackgroundWriterMain(void)
 		}
 
 		/*
+		 * Log a new xl_running_xacts every now and then so replication can get
+		 * into a consistent state faster (think of suboverflowed snapshots)
+		 * and clean up resources (locks, KnownXids*) more frequently. The
+		 * costs of this are relatively low, so doing it 4 times
+		 * (LOG_SNAPSHOT_INTERVAL_MS) a minute seems fine.
+		 *
+		 * We assume the interval for writing xl_running_xacts is
+		 * significantly bigger than BgWriterDelay, so we don't complicate the
+		 * overall timeout handling but just assume we're going to get called
+		 * often enough even if hibernation mode is active. It's not that
+		 * important that log_snap_interval_ms is met strictly. To make sure
+		 * we're not waking the disk up unneccesarily on an idle system we
+		 * check whether there has been any WAL inserted since the last time
+		 * we've logged a running xacts.
+		 *
+		 * We do this logging in the bgwriter as its the only process thats
+		 * run regularly and returns to its mainloop all the
+		 * time. E.g. Checkpointer, when active, is barely ever in its
+		 * mainloop and thus makes it hard to log regularly.
+		 */
+		if (XLogStandbyInfoActive() && !RecoveryInProgress())
+		{
+			TimestampTz timeout = 0;
+			TimestampTz now = GetCurrentTimestamp();
+			timeout = TimestampTzPlusMilliseconds(last_snapshot_ts,
+												  LOG_SNAPSHOT_INTERVAL_MS);
+
+			/*
+			 * only log if enough time has passed and some xlog record has been
+			 * inserted.
+			 */
+			if (now >= timeout &&
+				last_snapshot_lsn != GetXLogInsertRecPtr())
+			{
+				last_snapshot_lsn = LogStandbySnapshot();
+				last_snapshot_ts = now;
+			}
+		}
+
+		/*
 		 * Sleep until we are signaled or BgWriterDelay has elapsed.
 		 *
 		 * Note: the feedback control loop in BgBufferSync() expects that we
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index c704412..97da1a0 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -42,7 +42,7 @@ static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlis
 									   ProcSignalReason reason);
 static void ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid);
 static void SendRecoveryConflictWithBufferPin(ProcSignalReason reason);
-static void LogCurrentRunningXacts(RunningTransactions CurrRunningXacts);
+static XLogRecPtr LogCurrentRunningXacts(RunningTransactions CurrRunningXacts);
 static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks);
 
 
@@ -853,10 +853,13 @@ standby_redo(XLogRecPtr lsn, XLogRecord *record)
  * currently running xids, performed by StandbyReleaseOldLocks().
  * Zero xids should no longer be possible, but we may be replaying WAL
  * from a time when they were possible.
+ *
+ * Returns the RecPtr of the last inserted record.
  */
-void
+XLogRecPtr
 LogStandbySnapshot(void)
 {
+	XLogRecPtr recptr;
 	RunningTransactions running;
 	xl_standby_lock *locks;
 	int			nlocks;
@@ -876,9 +879,12 @@ LogStandbySnapshot(void)
 	 * record we write, because standby will open up when it sees this.
 	 */
 	running = GetRunningTransactionData();
-	LogCurrentRunningXacts(running);
+	recptr = LogCurrentRunningXacts(running);
+
 	/* GetRunningTransactionData() acquired XidGenLock, we must release it */
 	LWLockRelease(XidGenLock);
+
+	return recptr;
 }
 
 /*
@@ -889,7 +895,7 @@ LogStandbySnapshot(void)
  * is a contiguous chunk of memory and never exists fully until it is
  * assembled in WAL.
  */
-static void
+static XLogRecPtr
 LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
 {
 	xl_running_xacts xlrec;
@@ -939,6 +945,19 @@ LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
 			 CurrRunningXacts->oldestRunningXid,
 			 CurrRunningXacts->latestCompletedXid,
 			 CurrRunningXacts->nextXid);
+
+	/*
+	 * Ensure running_xacts information is synced to disk not too far in the
+	 * future. We don't want to stall anything though (i.e. use XLogFlush()),
+	 * so we let the wal writer do it during normal
+	 * operation. XLogSetAsyncXactLSN() conveniently will mark the LSN as
+	 * to-be-synced and nudge the WALWriter into action if sleeping. Check
+	 * XLogBackgroundFlush() for details why a record might not be flushed
+	 * without it.
+	 */
+	XLogSetAsyncXactLSN(recptr);
+
+	return recptr;
 }
 
 /*
diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
index 7f3f051..d4a8fe4 100644
--- a/src/include/storage/standby.h
+++ b/src/include/storage/standby.h
@@ -113,6 +113,6 @@ typedef RunningTransactionsData *RunningTransactions;
 extern void LogAccessExclusiveLock(Oid dbOid, Oid relOid);
 extern void LogAccessExclusiveLockPrepare(void);
 
-extern void LogStandbySnapshot(void);
+extern XLogRecPtr LogStandbySnapshot(void);
 
 #endif   /* STANDBY_H */
-- 
1.8.3.251.g1462b67

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to