On Fri, 2011-01-21 at 14:45 +0200, Heikki Linnakangas wrote:

> * The UI differs from what was agreed on here: 
> http://archives.postgresql.org/message-id/4d1dcf5a.7070...@enterprisedb.com.

Patch to add server_name parameter, plus mechanism to send info from
standby to master. While doing that, refactor into 3 message types, not
just 1. This addresses Fujii's comment that we may not wish to send
feedback as often as other replies, but doesn't actually alter yet when
the feedback is sent (nor will I do that anytime soon).

Complete but rough hack, for comments, but nothing surprising.

-- 
 Simon Riggs           http://www.2ndQuadrant.com/books/
 PostgreSQL Development, 24x7 Support, Training and Services
 
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index ee09468..ff89035 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -95,6 +95,7 @@ static struct
 }	LogstreamResult;
 
 static StandbyReplyMessage	reply_message;
+static StandbyHSFeedbackMessage	feedback_message;
 
 /*
  * About SIGTERM handling:
@@ -123,6 +124,8 @@ static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
 static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
 static void XLogWalRcvFlush(bool dying);
 static void XLogWalRcvSendReply(void);
+static void XLogWalRcvSendHSFeedback(void);
+static void XLogWalRcvSendInfo(void);
 
 /* Signal handlers */
 static void WalRcvSigHupHandler(SIGNAL_ARGS);
@@ -303,6 +306,7 @@ WalReceiverMain(void)
 		{
 			got_SIGHUP = false;
 			ProcessConfigFile(PGC_SIGHUP);
+			XLogWalRcvSendInfo();
 		}
 
 		/* Wait a while for data to arrive */
@@ -317,6 +321,7 @@ WalReceiverMain(void)
 
 			/* Let the master know that we received some data. */
 			XLogWalRcvSendReply();
+			XLogWalRcvSendHSFeedback();
 
 			/*
 			 * If we've written some records, flush them to disk and let the
@@ -331,6 +336,7 @@ WalReceiverMain(void)
 			 * the master anyway, to report any progress in applying WAL.
 			 */
 			XLogWalRcvSendReply();
+			XLogWalRcvSendHSFeedback();
 		}
 	}
 }
@@ -619,40 +625,84 @@ XLogWalRcvSendReply(void)
 	reply_message.apply = GetXLogReplayRecPtr();
 	reply_message.sendTime = now;
 
+	elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X",
+				 reply_message.write.xlogid, reply_message.write.xrecoff,
+				 reply_message.flush.xlogid, reply_message.flush.xrecoff,
+				 reply_message.apply.xlogid, reply_message.apply.xrecoff);
+
+	/* Prepend with the message type and send it. */
+	buf[0] = 'r';
+	memcpy(&buf[1], &reply_message, sizeof(StandbyReplyMessage));
+	walrcv_send(buf, sizeof(StandbyReplyMessage) + 1);
+}
+
+/*
+ * Send hot standby feedback message to primary, plus the current time,
+ * in case they don't have a watch.
+ */
+static void
+XLogWalRcvSendHSFeedback(void)
+{
+	char		buf[sizeof(StandbyHSFeedbackMessage) + 1];
+	TimestampTz	now;
+
+	/*
+	 * If the user doesn't want status to be reported to the master, be sure
+	 * to exit before doing anything at all.
+	 */
+	if (!hot_standby_feedback || !HotStandbyActive())
+		return;
+
+	/* Get current timestamp. */
+	now = GetCurrentTimestamp();
+
 	/*
 	 * Get the OldestXmin and its associated epoch
 	 */
-	if (hot_standby_feedback && HotStandbyActive())
 	{
 		TransactionId	nextXid;
 		uint32			nextEpoch;
 
-		reply_message.xmin = GetOldestXmin(true, false);
+		feedback_message.xmin = GetOldestXmin(true, false);
 
 		/*
 		 * Get epoch and adjust if nextXid and oldestXmin are different
 		 * sides of the epoch boundary.
 		 */
 		GetNextXidAndEpoch(&nextXid, &nextEpoch);
-		if (nextXid < reply_message.xmin)
+		if (nextXid < feedback_message.xmin)
 			nextEpoch--;
-		reply_message.epoch = nextEpoch;
-	}
-	else
-	{
-		reply_message.xmin = InvalidTransactionId;
-		reply_message.epoch = 0;
+		feedback_message.epoch = nextEpoch;
 	}
 
-	elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X xmin %u epoch %u",
-				 reply_message.write.xlogid, reply_message.write.xrecoff,
-				 reply_message.flush.xlogid, reply_message.flush.xrecoff,
-				 reply_message.apply.xlogid, reply_message.apply.xrecoff,
-				 reply_message.xmin,
-				 reply_message.epoch);
+	elog(DEBUG2, "sending xmin %u epoch %u",
+				 feedback_message.xmin,
+				 feedback_message.epoch);
 
 	/* Prepend with the message type and send it. */
-	buf[0] = 'r';
-	memcpy(&buf[1], &reply_message, sizeof(StandbyReplyMessage));
-	walrcv_send(buf, sizeof(StandbyReplyMessage) + 1);
+	buf[0] = 'h';
+	memcpy(&buf[1], &feedback_message, sizeof(StandbyHSFeedbackMessage));
+	walrcv_send(buf, sizeof(StandbyHSFeedbackMessage) + 1);
+}
+
+/*
+ * Send info message to primary.
+ */
+static void
+XLogWalRcvSendInfo(void)
+{
+	char		buf[sizeof(StandbyInfoMessage) + 1];
+	StandbyInfoMessage	info_message;
+
+	/* Get current timestamp. */
+	info_message.sendTime = GetCurrentTimestamp();
+	strncpy(info_message.servername, ServerName, strlen(ServerName));
+
+	elog(DEBUG2, "sending servername %s",
+				 info_message.servername);
+
+	/* Prepend with the message type and send it. */
+	buf[0] = 'i';
+	memcpy(&buf[1], &info_message, sizeof(StandbyInfoMessage));
+	walrcv_send(buf, sizeof(StandbyInfoMessage) + 1);
 }
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index a6a7a14..e46cd01 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -116,7 +116,10 @@ static void WalSndKill(int code, Datum arg);
 static bool XLogSend(char *msgbuf, bool *caughtup);
 static void IdentifySystem(void);
 static void StartReplication(StartReplicationCmd * cmd);
+static void ProcessStandbyMessage(void);
 static void ProcessStandbyReplyMessage(void);
+static void ProcessStandbyHSFeedbackMessage(void);
+static void ProcessStandbyInfoMessage(void);
 static void ProcessRepliesIfAny(void);
 
 
@@ -456,42 +459,45 @@ ProcessRepliesIfAny(void)
 	unsigned char firstchar;
 	int			r;
 
-	r = pq_getbyte_if_available(&firstchar);
-	if (r < 0)
-	{
-		/* unexpected error or EOF */
-		ereport(COMMERROR,
-				(errcode(ERRCODE_PROTOCOL_VIOLATION),
-				 errmsg("unexpected EOF on standby connection")));
-		proc_exit(0);
-	}
-	if (r == 0)
+	for (;;)
 	{
-		/* no data available without blocking */
-		return;
-	}
+		r = pq_getbyte_if_available(&firstchar);
+		if (r < 0)
+		{
+			/* unexpected error or EOF */
+			ereport(COMMERROR,
+					(errcode(ERRCODE_PROTOCOL_VIOLATION),
+					 errmsg("unexpected EOF on standby connection")));
+			proc_exit(0);
+		}
+		if (r == 0)
+		{
+			/* no data available without blocking */
+			return;
+		}
 
-	/* Handle the very limited subset of commands expected in this phase */
-	switch (firstchar)
-	{
-			/*
-			 * 'd' means a standby reply wrapped in a CopyData packet.
-			 */
-		case 'd':
-			ProcessStandbyReplyMessage();
-			break;
+		/* Handle the very limited subset of commands expected in this phase */
+		switch (firstchar)
+		{
+				/*
+				 * 'd' means a standby reply wrapped in a CopyData packet.
+				 */
+			case 'd':
+				ProcessStandbyMessage();
+				break;
 
-			/*
-			 * 'X' means that the standby is closing down the socket.
-			 */
-		case 'X':
-			proc_exit(0);
+				/*
+				 * 'X' means that the standby is closing down the socket.
+				 */
+			case 'X':
+				proc_exit(0);
 
-		default:
-			ereport(FATAL,
-					(errcode(ERRCODE_PROTOCOL_VIOLATION),
-					 errmsg("invalid standby closing message type %d",
-							firstchar)));
+			default:
+				ereport(FATAL,
+						(errcode(ERRCODE_PROTOCOL_VIOLATION),
+						 errmsg("invalid standby closing message type %d",
+								firstchar)));
+		}
 	}
 }
 
@@ -499,11 +505,9 @@ ProcessRepliesIfAny(void)
  * Process a status update message received from standby.
  */
 static void
-ProcessStandbyReplyMessage(void)
+ProcessStandbyMessage(void)
 {
-	StandbyReplyMessage	reply;
 	char msgtype;
-	TransactionId newxmin = InvalidTransactionId;
 
 	resetStringInfo(&reply_message);
 
@@ -523,22 +527,43 @@ ProcessStandbyReplyMessage(void)
 	 * one type.
 	 */
 	msgtype = pq_getmsgbyte(&reply_message);
-	if (msgtype != 'r')
+
+	switch (msgtype)
 	{
-		ereport(COMMERROR,
-				(errcode(ERRCODE_PROTOCOL_VIOLATION),
-				 errmsg("unexpected message type %c", msgtype)));
-		proc_exit(0);
+		case 'r':
+			ProcessStandbyReplyMessage();
+			break;
+
+		case 'h':
+			ProcessStandbyHSFeedbackMessage();
+			break;
+
+		case 'i':
+			ProcessStandbyInfoMessage();
+			break;
+
+		default:
+			ereport(COMMERROR,
+					(errcode(ERRCODE_PROTOCOL_VIOLATION),
+					 errmsg("unexpected message type %c", msgtype)));
+			proc_exit(0);
 	}
+}
+
+/*
+ * Regular reply from standby advising of WAL positions on standby server.
+ */
+static void
+ProcessStandbyReplyMessage(void)
+{
+	StandbyReplyMessage	reply;
 
 	pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyReplyMessage));
 
-	elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X xmin %u epoch %u",
+	elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X",
 		 reply.write.xlogid, reply.write.xrecoff,
 		 reply.flush.xlogid, reply.flush.xrecoff,
-		 reply.apply.xlogid, reply.apply.xrecoff,
-		 reply.xmin,
-		 reply.epoch);
+		 reply.apply.xlogid, reply.apply.xrecoff);
 
 	/*
 	 * Update shared state for this WalSender process
@@ -554,6 +579,22 @@ ProcessStandbyReplyMessage(void)
 		walsnd->apply = reply.apply;
 		SpinLockRelease(&walsnd->mutex);
 	}
+}
+
+/*
+ * Hot Standby feedback
+ */
+static void
+ProcessStandbyHSFeedbackMessage(void)
+{
+	StandbyHSFeedbackMessage	reply;
+	TransactionId newxmin = InvalidTransactionId;
+
+	pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyHSFeedbackMessage));
+
+	elog(DEBUG2, "xmin %u epoch %u",
+		 reply.xmin,
+		 reply.epoch);
 
 	/*
 	 * Update the WalSender's proc xmin to allow it to be visible
@@ -619,6 +660,16 @@ ProcessStandbyReplyMessage(void)
 	}
 }
 
+static void
+ProcessStandbyInfoMessage(void)
+{
+	StandbyInfoMessage	info;
+
+	pq_copymsgbytes(&reply_message, (char *) &info, sizeof(StandbyInfoMessage));
+
+	elog(DEBUG2, "server name %s", info.servername);
+}
+
 /* Main loop of walsender process */
 static int
 WalSndLoop(void)
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 55cbf75..de6de82 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -405,6 +405,9 @@ int			tcp_keepalives_idle;
 int			tcp_keepalives_interval;
 int			tcp_keepalives_count;
 
+char		*ServerName = NULL;
+
+
 /*
  * These variables are all dummies that don't do anything, except in some
  * cases provide the value for SHOW to display.  The real state is elsewhere
@@ -2365,6 +2368,15 @@ static struct config_string ConfigureNamesString[] =
 	},
 
 	{
+		{"server_name", PGC_POSTMASTER, CLIENT_CONN_STATEMENT,
+			gettext_noop("Allows setting of a unique name for this server."),
+			NULL
+		},
+		&ServerName,
+		"", NULL, NULL
+	},
+
+	{
 		{"temp_tablespaces", PGC_USERSET, CLIENT_CONN_STATEMENT,
 			gettext_noop("Sets the tablespace(s) to use for temporary tables and sort files."),
 			NULL,
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 6726733..cbe6fb2 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -56,6 +56,7 @@
 
 # - Connection Settings -
 
+#server_name = ''	# optional server name for use when clustering servers
 #listen_addresses = 'localhost'		# what IP address(es) to listen on;
 					# comma-separated list of addresses;
 					# defaults to 'localhost', '*' = all
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index aa8cce5..bf6c262 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -218,6 +218,7 @@ extern int	CTimeZone;
 
 #define MAXTZLEN		10		/* max TZ name len, not counting tr. null */
 
+extern char	*ServerName;
 extern bool enableFsync;
 extern bool allowSystemTableMods;
 extern PGDLLIMPORT int work_mem;
diff --git a/src/include/replication/walprotocol.h b/src/include/replication/walprotocol.h
index da94b6b..48cb333 100644
--- a/src/include/replication/walprotocol.h
+++ b/src/include/replication/walprotocol.h
@@ -56,6 +56,18 @@ typedef struct
 	XLogRecPtr	flush;
 	XLogRecPtr	apply;
 
+	/* Sender's system clock at the time of transmission */
+	TimestampTz sendTime;
+} StandbyReplyMessage;
+
+/*
+ * Hot Standby feedback from standby (message type 'h').  This is wrapped within
+ * a CopyData message at the FE/BE protocol level.
+ *
+ * Note that the data length is not specified here.
+ */
+typedef struct
+{
 	/*
 	 * The current xmin and epoch from the standby, for Hot Standby feedback.
 	 * This may be invalid if the standby-side does not support feedback,
@@ -64,10 +76,23 @@ typedef struct
 	TransactionId	xmin;
 	uint32			epoch;
 
+	/* Sender's system clock at the time of transmission */
+	TimestampTz sendTime;
+} StandbyHSFeedbackMessage;
+
+/*
+ * Info message from standby (message type 'i').  This is wrapped within
+ * a CopyData message at the FE/BE protocol level.
+ *
+ * Note that the data length is not specified here.
+ */
+typedef struct
+{
+	char		servername[64];
 
 	/* Sender's system clock at the time of transmission */
 	TimestampTz sendTime;
-} StandbyReplyMessage;
+} StandbyInfoMessage;
 
 /*
  * Maximum data payload in a WAL data message.	Must be >= XLOG_BLCKSZ.
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to