Hi,

I've attached a couple of the preliminary patches to $subject which I've
recently cleaned up in the hope that we can continue improving on those
in a piecemal fashion.
I am preparing submission of a newer version of the major patch but
unfortunately progress on that is slower than I'd like...

In the order of chance of applying them individuall they are:

0005 wal_decoding: Log xl_running_xact's at a higher frequency than checkpoints 
are done
* benefits hot standby startup
0003 wal_decoding: Allow walsender's to connect to a specific database
* biggest problem is how to specify the connection we connect
  to. Currently with the patch walsender connects to a database if it's
  not named "replication" (via dbname). Perhaps it's better to invent a
  replication_dbname parameter?
0006 wal_decoding: copydir: move fsync_fname to fd.[c.h] and make it public
* Pretty trivial and boring.
0007 wal_decoding: Add information about a tables primary key to struct 
RelationData
* Could be used in the matview refresh code
0002 wal_decoding: Introduce InvalidCommandId and declare that to be the new 
maximum for CommandCounterIncrement

Greetings,

Andres Freund

-- 
 Andres Freund                     http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
>From 3442c3a4e44c5a64efbe651b745a6f86f69cfdab Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Mon, 19 Aug 2013 13:24:30 +0200
Subject: [PATCH 02/13] wal_decoding: Introduce InvalidCommandId and declare
 that to be the new maximum for CommandCounterIncrement

This is useful to be able to represent a CommandId thats invalid. There was no
such value before.

This decreases the possible number of subtransactions by one which seems
unproblematic. Its also not a problem for pg_upgrade because cmin/cmax are
never looked at outside the context of their own transaction (spare timetravel
access, but thats new anyway).
---
 src/backend/access/transam/xact.c | 4 ++--
 src/include/c.h                   | 1 +
 2 files changed, 3 insertions(+), 2 deletions(-)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 31e868d..0591f3f 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -766,12 +766,12 @@ CommandCounterIncrement(void)
 	if (currentCommandIdUsed)
 	{
 		currentCommandId += 1;
-		if (currentCommandId == FirstCommandId) /* check for overflow */
+		if (currentCommandId == InvalidCommandId)
 		{
 			currentCommandId -= 1;
 			ereport(ERROR,
 					(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
-					 errmsg("cannot have more than 2^32-1 commands in a transaction")));
+					 errmsg("cannot have more than 2^32-2 commands in a transaction")));
 		}
 		currentCommandIdUsed = false;
 
diff --git a/src/include/c.h b/src/include/c.h
index 5961183..14bfdcd 100644
--- a/src/include/c.h
+++ b/src/include/c.h
@@ -368,6 +368,7 @@ typedef uint32 MultiXactOffset;
 typedef uint32 CommandId;
 
 #define FirstCommandId	((CommandId) 0)
+#define InvalidCommandId	(~(CommandId)0)
 
 /*
  * Array indexing support
-- 
1.8.3.251.g1462b67

>From ac48fc2f5c5f0031494cfabb0bca46f0bbef47d2 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Mon, 19 Aug 2013 13:24:30 +0200
Subject: [PATCH 03/13] wal_decoding: Allow walsender's to connect to a
 specific database

Currently the decision whether to connect to a database or not is made by
checking whether the passed "dbname" parameter is "replication". Unfortunately
this makes it impossible to connect a to a database named
replication... Possibly it would be better to use a separate connection
parameter like replication_dbname=xxx?

This is useful for future walsender commands which need database interaction.
---
 doc/src/sgml/protocol.sgml                         |  5 +++-
 src/backend/postmaster/postmaster.c                | 13 +++++++++--
 .../libpqwalreceiver/libpqwalreceiver.c            |  4 ++--
 src/backend/replication/walsender.c                | 27 ++++++++++++++++++----
 src/backend/utils/init/postinit.c                  |  5 ++++
 src/bin/pg_basebackup/pg_basebackup.c              |  4 ++--
 src/bin/pg_basebackup/pg_receivexlog.c             |  4 ++--
 src/bin/pg_basebackup/receivelog.c                 |  4 ++--
 8 files changed, 51 insertions(+), 15 deletions(-)

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 0b2e60e..51b4435 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1304,7 +1304,10 @@ To initiate streaming replication, the frontend sends the
 <literal>replication</> parameter in the startup message. This tells the
 backend to go into walsender mode, wherein a small set of replication commands
 can be issued instead of SQL statements. Only the simple query protocol can be
-used in walsender mode.
+used in walsender mode. A <literal>dbname</> of <literal>replication</> will
+start a walsender not connected to any database, specifying any other database
+will connect to that. Connecting to a specific database is only required for
+logical replication so far.
 
 The commands accepted in walsender mode are:
 
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 01d2618..7520e42 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -1983,9 +1983,18 @@ retry1:
 	if (strlen(port->user_name) >= NAMEDATALEN)
 		port->user_name[NAMEDATALEN - 1] = '\0';
 
-	/* Walsender is not related to a particular database */
-	if (am_walsender)
+	/*
+	 * Generic walsender, e.g. for streaming replication, is not connected to a
+	 * particular database. But walsenders used for logical replication need to
+	 * connect to a specific database. Unfortunately the initial choices for
+	 * distinguishing normal connections from replication connections included
+	 * dbname=replication being specified for the latter.
+	 * We now assume that a database name
+	 */
+	if (am_walsender && strcmp(port->database_name, "replication") == 0)
 		port->database_name[0] = '\0';
+	else if (am_walsender)
+		elog(DEBUG1, "WAL sender attaching to database %s", port->database_name);
 
 	/*
 	 * Done putting stuff in TopMemoryContext.
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 6bc0aa1..ee0f1fe 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -130,7 +130,7 @@ libpqrcv_identify_system(TimeLineID *primary_tli)
 						"the primary server: %s",
 						PQerrorMessage(streamConn))));
 	}
-	if (PQnfields(res) != 3 || PQntuples(res) != 1)
+	if (PQnfields(res) != 4 || PQntuples(res) != 1)
 	{
 		int			ntuples = PQntuples(res);
 		int			nfields = PQnfields(res);
@@ -138,7 +138,7 @@ libpqrcv_identify_system(TimeLineID *primary_tli)
 		PQclear(res);
 		ereport(ERROR,
 				(errmsg("invalid response from primary server"),
-				 errdetail("Expected 1 tuple with 3 fields, got %d tuples with %d fields.",
+				 errdetail("Expected 1 tuple with 4 fields, got %d tuples with %d fields.",
 						   ntuples, nfields)));
 	}
 	primary_sysid = PQgetvalue(res, 0, 0);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index eae6e59..f6463fc 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -47,6 +47,7 @@
 #include "access/transam.h"
 #include "access/xlog_internal.h"
 #include "catalog/pg_type.h"
+#include "commands/dbcommands.h"
 #include "funcapi.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
@@ -243,10 +244,12 @@ IdentifySystem(void)
 	char		tli[11];
 	char		xpos[MAXFNAMELEN];
 	XLogRecPtr	logptr;
+	char*        dbname = NULL;
 
 	/*
-	 * Reply with a result set with one row, three columns. First col is
-	 * system ID, second is timeline ID, and third is current xlog location.
+	 * Reply with a result set with one row, four columns. First col is system
+	 * ID, second is timeline ID, third is current xlog location and the fourth
+	 * contains the database name if we are connected to one.
 	 */
 
 	snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
@@ -265,9 +268,14 @@ IdentifySystem(void)
 
 	snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
 
+	if (MyDatabaseId != InvalidOid)
+		dbname = get_database_name(MyDatabaseId);
+	else
+		dbname = "(none)";
+
 	/* Send a RowDescription message */
 	pq_beginmessage(&buf, 'T');
-	pq_sendint(&buf, 3, 2);		/* 3 fields */
+	pq_sendint(&buf, 4, 2);		/* 4 fields */
 
 	/* first field */
 	pq_sendstring(&buf, "systemid");	/* col name */
@@ -295,17 +303,28 @@ IdentifySystem(void)
 	pq_sendint(&buf, -1, 2);
 	pq_sendint(&buf, 0, 4);
 	pq_sendint(&buf, 0, 2);
+
+	/* fourth field */
+	pq_sendstring(&buf, "dbname");
+	pq_sendint(&buf, 0, 4);
+	pq_sendint(&buf, 0, 2);
+	pq_sendint(&buf, TEXTOID, 4);
+	pq_sendint(&buf, -1, 2);
+	pq_sendint(&buf, 0, 4);
+	pq_sendint(&buf, 0, 2);
 	pq_endmessage(&buf);
 
 	/* Send a DataRow message */
 	pq_beginmessage(&buf, 'D');
-	pq_sendint(&buf, 3, 2);		/* # of columns */
+	pq_sendint(&buf, 4, 2);		/* # of columns */
 	pq_sendint(&buf, strlen(sysid), 4); /* col1 len */
 	pq_sendbytes(&buf, (char *) &sysid, strlen(sysid));
 	pq_sendint(&buf, strlen(tli), 4);	/* col2 len */
 	pq_sendbytes(&buf, (char *) tli, strlen(tli));
 	pq_sendint(&buf, strlen(xpos), 4);	/* col3 len */
 	pq_sendbytes(&buf, (char *) xpos, strlen(xpos));
+	pq_sendint(&buf, strlen(dbname), 4);	/* col4 len */
+	pq_sendbytes(&buf, (char *) dbname, strlen(dbname));
 
 	pq_endmessage(&buf);
 }
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index 2c7f0f1..56c352c 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -725,7 +725,12 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
 			ereport(FATAL,
 					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
 					 errmsg("must be superuser or replication role to start walsender")));
+	}
 
+	if (am_walsender &&
+	    (in_dbname == NULL || in_dbname[0] == '\0') &&
+	    dboid == InvalidOid)
+	{
 		/* process any options passed in the startup packet */
 		if (MyProcPort != NULL)
 			process_startup_options(MyProcPort, am_superuser);
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index a1e12a8..89e2376 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -1361,11 +1361,11 @@ BaseBackup(void)
 				progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
 		disconnect_and_exit(1);
 	}
-	if (PQntuples(res) != 1 || PQnfields(res) != 3)
+	if (PQntuples(res) != 1 || PQnfields(res) != 4)
 	{
 		fprintf(stderr,
 				_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"),
-				progname, PQntuples(res), PQnfields(res), 1, 3);
+				progname, PQntuples(res), PQnfields(res), 1, 4);
 		disconnect_and_exit(1);
 	}
 	sysidentifier = pg_strdup(PQgetvalue(res, 0, 0));
diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c
index 787a395..fe8aef6 100644
--- a/src/bin/pg_basebackup/pg_receivexlog.c
+++ b/src/bin/pg_basebackup/pg_receivexlog.c
@@ -252,11 +252,11 @@ StreamLog(void)
 				progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
 		disconnect_and_exit(1);
 	}
-	if (PQntuples(res) != 1 || PQnfields(res) != 3)
+	if (PQntuples(res) != 1 || PQnfields(res) != 4)
 	{
 		fprintf(stderr,
 				_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"),
-				progname, PQntuples(res), PQnfields(res), 1, 3);
+				progname, PQntuples(res), PQnfields(res), 1, 4);
 		disconnect_and_exit(1);
 	}
 	servertli = atoi(PQgetvalue(res, 0, 1));
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index d56a4d7..22a5340 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -534,11 +534,11 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 			PQclear(res);
 			return false;
 		}
-		if (PQnfields(res) != 3 || PQntuples(res) != 1)
+		if (PQnfields(res) != 4 || PQntuples(res) != 1)
 		{
 			fprintf(stderr,
 					_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"),
-					progname, PQntuples(res), PQnfields(res), 1, 3);
+					progname, PQntuples(res), PQnfields(res), 1, 4);
 			PQclear(res);
 			return false;
 		}
-- 
1.8.3.251.g1462b67

>From 23fbc42744fbe74d0ea230dcab8275011177c7a6 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Mon, 19 Aug 2013 13:24:30 +0200
Subject: [PATCH 05/13] wal_decoding: Log xl_running_xact's at a higher
 frequency than checkpoints are done

Logging information about running xacts more frequently is beneficial for both,
hot standby which can reach consistency faster and release some resources
earlier using this information, and future logical replication which can
initialize quicker using this.

Do so in the background writer which seems to be the best choice as its
regularly running and shouldn't be busy for too long without getting back into
its main loop.

Also mark xl_running_xact records as being relevant for async commit so the wal
writer writes them out soonish instead of possibly waiting a long time.
---
 src/backend/postmaster/bgwriter.c | 48 +++++++++++++++++++++++++++++++++++++++
 src/backend/storage/ipc/standby.c | 25 ++++++++++++++++----
 src/include/storage/standby.h     |  2 +-
 3 files changed, 70 insertions(+), 5 deletions(-)

diff --git a/src/backend/postmaster/bgwriter.c b/src/backend/postmaster/bgwriter.c
index 286ae86..dd62917 100644
--- a/src/backend/postmaster/bgwriter.c
+++ b/src/backend/postmaster/bgwriter.c
@@ -54,9 +54,11 @@
 #include "storage/shmem.h"
 #include "storage/smgr.h"
 #include "storage/spin.h"
+#include "storage/standby.h"
 #include "utils/guc.h"
 #include "utils/memutils.h"
 #include "utils/resowner.h"
+#include "utils/timestamp.h"
 
 
 /*
@@ -76,6 +78,10 @@ int			BgWriterDelay = 200;
 static volatile sig_atomic_t got_SIGHUP = false;
 static volatile sig_atomic_t shutdown_requested = false;
 
+static TimestampTz last_logged_snap_ts;
+static XLogRecPtr last_logged_snap_recptr = InvalidXLogRecPtr;
+static uint32 log_snap_interval_ms = 15000;
+
 /* Signal handlers */
 
 static void bg_quickdie(SIGNAL_ARGS);
@@ -142,6 +148,12 @@ BackgroundWriterMain(void)
 	CurrentResourceOwner = ResourceOwnerCreate(NULL, "Background Writer");
 
 	/*
+	 * We just started, assume there has been either a shutdown or
+	 * end-of-recovery snapshot.
+	 */
+	last_logged_snap_ts = GetCurrentTimestamp();
+
+	/*
 	 * Create a memory context that we will do all our work in.  We do this so
 	 * that we can reset the context during error recovery and thereby avoid
 	 * possible memory leaks.  Formerly this code just ran in
@@ -276,6 +288,42 @@ BackgroundWriterMain(void)
 		}
 
 		/*
+		 * Log a new xl_running_xacts every now and then so replication can get
+		 * into a consistent state faster and clean up resources more
+		 * frequently. The costs of this are relatively low, so doing it 4
+		 * times a minute seems fine.
+		 *
+		 * We assume the interval for writing xl_running_xacts is significantly
+		 * bigger than BgWriterDelay, so we don't complicate the overall
+		 * timeout handling but just assume we're going to get called often
+		 * enough even if hibernation mode is active. It's not that important
+		 * that log_snap_interval_ms is met strictly.
+		 *
+		 * We do this logging in the bgwriter as its the only process thats run
+		 * regularly and returns to its mainloop all the
+		 * time. E.g. Checkpointer, when active, is barely ever in its
+		 * mainloop.
+		 */
+		if (XLogStandbyInfoActive() && !RecoveryInProgress())
+		{
+			TimestampTz timeout = 0;
+			TimestampTz now = GetCurrentTimestamp();
+			timeout = TimestampTzPlusMilliseconds(last_logged_snap_ts,
+												  log_snap_interval_ms);
+
+			/*
+			 * only log if enough time has passed and some xlog record has been
+			 * inserted.
+			 */
+			if (now >= timeout &&
+				last_logged_snap_recptr != GetXLogInsertRecPtr())
+			{
+				last_logged_snap_recptr = LogStandbySnapshot();
+				last_logged_snap_ts = now;
+			}
+		}
+
+		/*
 		 * Sleep until we are signaled or BgWriterDelay has elapsed.
 		 *
 		 * Note: the feedback control loop in BgBufferSync() expects that we
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index c704412..6f0de13 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -42,7 +42,7 @@ static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlis
 									   ProcSignalReason reason);
 static void ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid);
 static void SendRecoveryConflictWithBufferPin(ProcSignalReason reason);
-static void LogCurrentRunningXacts(RunningTransactions CurrRunningXacts);
+static XLogRecPtr LogCurrentRunningXacts(RunningTransactions CurrRunningXacts);
 static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks);
 
 
@@ -853,10 +853,13 @@ standby_redo(XLogRecPtr lsn, XLogRecord *record)
  * currently running xids, performed by StandbyReleaseOldLocks().
  * Zero xids should no longer be possible, but we may be replaying WAL
  * from a time when they were possible.
+ *
+ * Returns the RecPtr of the last inserted record.
  */
-void
+XLogRecPtr
 LogStandbySnapshot(void)
 {
+	XLogRecPtr recptr;
 	RunningTransactions running;
 	xl_standby_lock *locks;
 	int			nlocks;
@@ -876,9 +879,12 @@ LogStandbySnapshot(void)
 	 * record we write, because standby will open up when it sees this.
 	 */
 	running = GetRunningTransactionData();
-	LogCurrentRunningXacts(running);
+	recptr = LogCurrentRunningXacts(running);
+
 	/* GetRunningTransactionData() acquired XidGenLock, we must release it */
 	LWLockRelease(XidGenLock);
+
+	return recptr;
 }
 
 /*
@@ -889,7 +895,7 @@ LogStandbySnapshot(void)
  * is a contiguous chunk of memory and never exists fully until it is
  * assembled in WAL.
  */
-static void
+static XLogRecPtr
 LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
 {
 	xl_running_xacts xlrec;
@@ -939,6 +945,17 @@ LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
 			 CurrRunningXacts->oldestRunningXid,
 			 CurrRunningXacts->latestCompletedXid,
 			 CurrRunningXacts->nextXid);
+
+	/*
+	 * Ensure running_xacts information is synced to disk not too far in the
+	 * future. We don't want to stall anything though, so we let the wal writer
+	 * do it during normal operation. XLogSetAsyncXactLSN() conveniently will
+	 * mark the LSN as to-be-synced and nudge the WALWriter into action if
+	 * sleeping.
+	 */
+	XLogSetAsyncXactLSN(recptr);
+
+	return recptr;
 }
 
 /*
diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
index 7f3f051..d4a8fe4 100644
--- a/src/include/storage/standby.h
+++ b/src/include/storage/standby.h
@@ -113,6 +113,6 @@ typedef RunningTransactionsData *RunningTransactions;
 extern void LogAccessExclusiveLock(Oid dbOid, Oid relOid);
 extern void LogAccessExclusiveLockPrepare(void);
 
-extern void LogStandbySnapshot(void);
+extern XLogRecPtr LogStandbySnapshot(void);
 
 #endif   /* STANDBY_H */
-- 
1.8.3.251.g1462b67

>From 80cc2aafde8f513fea61e6ab6898b7e6b3627d8d Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Mon, 19 Aug 2013 13:24:30 +0200
Subject: [PATCH 06/13] wal_decoding: copydir: move fsync_fname to fd.[c.h] and
 make it public

---
 src/backend/storage/file/copydir.c | 59 --------------------------------------
 src/backend/storage/file/fd.c      | 56 ++++++++++++++++++++++++++++++++++++
 src/include/storage/fd.h           |  1 +
 3 files changed, 57 insertions(+), 59 deletions(-)

diff --git a/src/backend/storage/file/copydir.c b/src/backend/storage/file/copydir.c
index 391359c..427a0df 100644
--- a/src/backend/storage/file/copydir.c
+++ b/src/backend/storage/file/copydir.c
@@ -27,9 +27,6 @@
 #include "miscadmin.h"
 
 
-static void fsync_fname(char *fname, bool isdir);
-
-
 /*
  * copydir: copy a directory
  *
@@ -207,59 +204,3 @@ copy_file(char *fromfile, char *tofile)
 
 	pfree(buffer);
 }
-
-
-/*
- * fsync a file
- *
- * Try to fsync directories but ignore errors that indicate the OS
- * just doesn't allow/require fsyncing directories.
- */
-static void
-fsync_fname(char *fname, bool isdir)
-{
-	int			fd;
-	int			returncode;
-
-	/*
-	 * Some OSs require directories to be opened read-only whereas other
-	 * systems don't allow us to fsync files opened read-only; so we need both
-	 * cases here
-	 */
-	if (!isdir)
-		fd = OpenTransientFile(fname,
-							   O_RDWR | PG_BINARY,
-							   S_IRUSR | S_IWUSR);
-	else
-		fd = OpenTransientFile(fname,
-							   O_RDONLY | PG_BINARY,
-							   S_IRUSR | S_IWUSR);
-
-	/*
-	 * Some OSs don't allow us to open directories at all (Windows returns
-	 * EACCES)
-	 */
-	if (fd < 0 && isdir && (errno == EISDIR || errno == EACCES))
-		return;
-
-	else if (fd < 0)
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("could not open file \"%s\": %m", fname)));
-
-	returncode = pg_fsync(fd);
-
-	/* Some OSs don't allow us to fsync directories at all */
-	if (returncode != 0 && isdir && errno == EBADF)
-	{
-		CloseTransientFile(fd);
-		return;
-	}
-
-	if (returncode != 0)
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("could not fsync file \"%s\": %m", fname)));
-
-	CloseTransientFile(fd);
-}
diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c
index 436b901..de4d902 100644
--- a/src/backend/storage/file/fd.c
+++ b/src/backend/storage/file/fd.c
@@ -385,6 +385,62 @@ pg_flush_data(int fd, off_t offset, off_t amount)
 
 
 /*
+ * fsync_fname -- fsync a file or directory, handling errors properly
+ *
+ * Try to fsync a file or directory. When doing the latter, ignore errors that
+ * indicate the OS just doesn't allow/require fsyncing directories.
+ */
+void
+fsync_fname(char *fname, bool isdir)
+{
+	int			fd;
+	int			returncode;
+
+	/*
+	 * Some OSs require directories to be opened read-only whereas other
+	 * systems don't allow us to fsync files opened read-only; so we need both
+	 * cases here
+	 */
+	if (!isdir)
+		fd = OpenTransientFile(fname,
+							   O_RDWR | PG_BINARY,
+							   S_IRUSR | S_IWUSR);
+	else
+		fd = OpenTransientFile(fname,
+							   O_RDONLY | PG_BINARY,
+							   S_IRUSR | S_IWUSR);
+
+	/*
+	 * Some OSs don't allow us to open directories at all (Windows returns
+	 * EACCES)
+	 */
+	if (fd < 0 && isdir && (errno == EISDIR || errno == EACCES))
+		return;
+
+	else if (fd < 0)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not open file \"%s\": %m", fname)));
+
+	returncode = pg_fsync(fd);
+
+	/* Some OSs don't allow us to fsync directories at all */
+	if (returncode != 0 && isdir && errno == EBADF)
+	{
+		CloseTransientFile(fd);
+		return;
+	}
+
+	if (returncode != 0)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not fsync file \"%s\": %m", fname)));
+
+	CloseTransientFile(fd);
+}
+
+
+/*
  * InitFileAccess --- initialize this module during backend startup
  *
  * This is called during either normal or standalone backend start.
diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h
index 90b4933..2a60229 100644
--- a/src/include/storage/fd.h
+++ b/src/include/storage/fd.h
@@ -113,6 +113,7 @@ extern int	pg_fsync_no_writethrough(int fd);
 extern int	pg_fsync_writethrough(int fd);
 extern int	pg_fdatasync(int fd);
 extern int	pg_flush_data(int fd, off_t offset, off_t amount);
+extern void fsync_fname(char *fname, bool isdir);
 
 /* Filename components for OpenTemporaryFile */
 #define PG_TEMP_FILES_DIR "pgsql_tmp"
-- 
1.8.3.251.g1462b67

>From e6b541f7b11ac77fbb9afaf4a0f961057d2f25b1 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Mon, 19 Aug 2013 13:24:30 +0200
Subject: [PATCH 07/13] wal_decoding: Add information about a tables primary
 key to struct RelationData

'rd_primary' now contains the Oid of an index over uniquely identifying
columns. Several types of indexes are interesting and are collected in that
order:
* Primary Key
* oid index
* the first (OID order) unique, immediate, non-partial and
  non-expression index over one or more NOT NULL'ed columns

To gather rd_primary value RelationGetIndexList() needs to have been called.

This is helpful because for logical replication we frequently - on the sending
and receiving side - need to lookup that index and RelationGetIndexList already
gathers all the necessary information.

This could be used to replace tablecmd.c's transformFkeyGetPrimaryKey, but
would change the meaning of that, so it seems to require additional discussion.
---
 src/backend/utils/cache/relcache.c | 52 +++++++++++++++++++++++++++++++++++---
 src/include/utils/rel.h            | 12 +++++++++
 2 files changed, 61 insertions(+), 3 deletions(-)

diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index 66fb63b..c588c29 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -3465,7 +3465,9 @@ RelationGetIndexList(Relation relation)
 	ScanKeyData skey;
 	HeapTuple	htup;
 	List	   *result;
-	Oid			oidIndex;
+	Oid			oidIndex = InvalidOid;
+	Oid			pkeyIndex = InvalidOid;
+	Oid			candidateIndex = InvalidOid;
 	MemoryContext oldcxt;
 
 	/* Quick exit if we already computed the list. */
@@ -3522,17 +3524,61 @@ RelationGetIndexList(Relation relation)
 		Assert(!isnull);
 		indclass = (oidvector *) DatumGetPointer(indclassDatum);
 
+		if (!IndexIsValid(index))
+			continue;
+
 		/* Check to see if it is a unique, non-partial btree index on OID */
-		if (IndexIsValid(index) &&
-			index->indnatts == 1 &&
+		if (index->indnatts == 1 &&
 			index->indisunique && index->indimmediate &&
 			index->indkey.values[0] == ObjectIdAttributeNumber &&
 			indclass->values[0] == OID_BTREE_OPS_OID &&
 			heap_attisnull(htup, Anum_pg_index_indpred))
 			oidIndex = index->indexrelid;
+
+		if (index->indisunique &&
+			index->indimmediate &&
+			heap_attisnull(htup, Anum_pg_index_indpred))
+		{
+			/* always prefer primary keys */
+			if (index->indisprimary)
+				pkeyIndex = index->indexrelid;
+			else if (!OidIsValid(pkeyIndex)
+					&& !OidIsValid(oidIndex)
+					&& !OidIsValid(candidateIndex))
+			{
+				int key;
+				bool found = true;
+				for (key = 0; key < index->indnatts; key++)
+				{
+					int16 attno = index->indkey.values[key];
+					Form_pg_attribute attr;
+					/* internal column, like oid */
+					if (attno <= 0)
+						continue;
+
+					attr = relation->rd_att->attrs[attno - 1];
+					if (!attr->attnotnull)
+					{
+						found = false;
+						break;
+					}
+				}
+				if (found)
+					candidateIndex = index->indexrelid;
+			}
+		}
 	}
 
 	systable_endscan(indscan);
+
+	if (OidIsValid(pkeyIndex))
+		relation->rd_primary = pkeyIndex;
+	/* prefer oid indexes over normal candidate ones */
+	else if (OidIsValid(oidIndex))
+		relation->rd_primary = oidIndex;
+	else if (OidIsValid(candidateIndex))
+		relation->rd_primary = candidateIndex;
+
 	heap_close(indrel, AccessShareLock);
 
 	/* Now save a copy of the completed list in the relcache entry. */
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index 589c9a8..0281b4b 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -111,6 +111,18 @@ typedef struct RelationData
 	TriggerDesc *trigdesc;		/* Trigger info, or NULL if rel has none */
 
 	/*
+	 * The 'best' primary or candidate key that has been found, only set
+	 * correctly if RelationGetIndexList has been called/rd_indexvalid > 0.
+	 *
+	 * Indexes are chosen in the following order:
+	 * * Primary Key
+	 * * oid index
+	 * * the first (OID order) unique, immediate, non-partial and
+	 *   non-expression index over one or more NOT NULL'ed columns
+	 */
+	Oid rd_primary;
+
+	/*
 	 * rd_options is set whenever rd_rel is loaded into the relcache entry.
 	 * Note that you can NOT look into rd_rel for this data.  NULL means "use
 	 * defaults".
-- 
1.8.3.251.g1462b67

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to