diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 47312f6..535b5a9 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1324,7 +1324,10 @@ RecordTransactionCommit(void)
 	 * in the procarray and continue to hold locks.
 	 */
 	if (wrote_xlog && markXidCommitted)
+	{
+		CausalReadsWaitForLSN(XactLastRecEnd);
 		SyncRepWaitForLSN(XactLastRecEnd);
+	}
 
 	/* remember end of last commit record */
 	XactLastCommitEnd = XactLastRecEnd;
@@ -5117,6 +5120,13 @@ XactLogCommitRecord(TimestampTz commit_time,
 		xl_xinfo.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT;
 
 	/*
+	 * Check if the caller would like to ask standbys for immediate feedback
+	 * once this commit is applied.
+	 */
+	if (synchronous_commit >= SYNCHRONOUS_COMMIT_REMOTE_APPLY || causal_reads)
+		xl_xinfo.xinfo |= XACT_COMPLETION_SYNC_APPLY_FEEDBACK;
+
+	/*
 	 * Relcache invalidations requires information about the current database
 	 * and so does logical decoding.
 	 */
@@ -5452,6 +5462,19 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
 	if (XactCompletionForceSyncCommit(parsed->xinfo))
 		XLogFlush(lsn);
 
+	/*
+	 * Record the primary's timestamp for the commit record, so it can be used
+	 * for tracking replay lag.
+	 */
+	SetXLogReplayTimestamp(parsed->xact_time);
+
+	/*
+	 * If asked by the primary (because someone is waiting for a synchronous
+	 * commit or causal reads), we will need to ask walreceiver to send a
+	 * reply immediately.
+	 */
+	if (XactCompletionSyncApplyFeedback(parsed->xinfo))
+		XLogRequestWalReceiverReply();
 }
 
 /*
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index f17f834..531f512 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -80,6 +80,8 @@ extern uint32 bootstrap_data_checksum_version;
 #define PROMOTE_SIGNAL_FILE		"promote"
 #define FALLBACK_PROMOTE_SIGNAL_FILE "fallback_promote"
 
+/* Size of the circular buffer of timestamped LSNs. */
+#define MAX_TIMESTAMPED_LSNS 8192
 
 /* User-settable parameters */
 int			max_wal_size = 64;	/* 1 GB */
@@ -346,6 +348,12 @@ static XLogRecPtr RedoRecPtr;
 static bool doPageWrites;
 
 /*
+ * doRequestWalReceiverReply is used by recovery code to ask the main recovery
+ * loop to trigger a walreceiver reply.
+ */
+static bool doRequestWalReceiverReply;
+
+/*
  * RedoStartLSN points to the checkpoint's REDO location which is specified
  * in a backup label file, backup history file or control file. In standby
  * mode, XLOG streaming usually starts from the position where an invalid
@@ -357,6 +365,13 @@ static bool doPageWrites;
  */
 static XLogRecPtr RedoStartLSN = InvalidXLogRecPtr;
 
+/*
+ * LastReplayedTimestamp can be set by redo handlers when they apply a record
+ * that carries a timestamp, by calling SetXLogReplayedTimestamp.  The xlog
+ * apply loop can then update the value in shared memory.
+ */
+static TimestampTz LastReplayedTimestamp = 0;
+
 /*----------
  * Shared-memory data structures for XLOG control
  *
@@ -632,6 +647,21 @@ typedef struct XLogCtlData
 	/* current effective recovery target timeline */
 	TimeLineID	RecoveryTargetTLI;
 
+	/* timestamp from the most recently applied record that carried a timestamp. */
+	TimestampTz lastReplayedTimestamp;
+
+	/*
+	 * We maintain a circular buffer of LSNs and associated timestamps.
+	 * Walreceiver writes into it using information from timestamps, and the
+	 * startup recovery process reads from it and notifies walreceiver when
+	 * LSNs are replayed so that the timestamps can be fed back to the
+	 * upstream server, to track lag.
+	 */
+	Index		timestampedLsnRead;
+	Index		timestampedLsnWrite;
+	XLogRecPtr	timestampedLsn[MAX_TIMESTAMPED_LSNS];
+	TimestampTz	timestampedLsnTime[MAX_TIMESTAMPED_LSNS];
+
 	/*
 	 * timestamp of when we started replaying the current chunk of WAL data,
 	 * only relevant for replication or archive recovery
@@ -6847,14 +6877,58 @@ StartupXLOG(void)
 				error_context_stack = errcallback.previous;
 
 				/*
-				 * Update lastReplayedEndRecPtr after this record has been
-				 * successfully replayed.
+				 * Update lastReplayedEndRecPtr and lastReplayedTimestamp
+				 * after this record has been successfully replayed.
 				 */
 				SpinLockAcquire(&XLogCtl->info_lck);
 				XLogCtl->lastReplayedEndRecPtr = EndRecPtr;
 				XLogCtl->lastReplayedTLI = ThisTimeLineID;
+				if (LastReplayedTimestamp != 0)
+				{
+					/* If replaying a record produced a timestamp, use that. */
+					XLogCtl->lastReplayedTimestamp = LastReplayedTimestamp;
+					LastReplayedTimestamp = 0;
+				}
+				else
+				{
+					/*
+					 * If we have applied LSNs associated with timestamps
+					 * received by walreceiver, then use the recorded
+					 * timestamp.  We consume from the read end of the
+					 * circular buffer.
+					 */
+					while (XLogCtl->timestampedLsnRead !=
+						   XLogCtl->timestampedLsnWrite &&
+						   XLogCtl->timestampedLsn[XLogCtl->timestampedLsnRead]
+						   <= EndRecPtr)
+					{
+						if (XLogCtl->timestampedLsnTime[XLogCtl->timestampedLsnRead] >
+							XLogCtl->lastReplayedTimestamp)
+						{
+							XLogCtl->lastReplayedTimestamp =
+								XLogCtl->timestampedLsnTime[XLogCtl->timestampedLsnRead];
+							doRequestWalReceiverReply = true;
+						}
+						XLogCtl->timestampedLsnRead =
+							(XLogCtl->timestampedLsnRead + 1) % MAX_TIMESTAMPED_LSNS;
+					}
+				}
 				SpinLockRelease(&XLogCtl->info_lck);
 
+				/*
+				 * If rm_redo reported that it applied a commit record that
+				 * the master is waiting for by calling
+				 * XLogRequestWalReceiverReply, or we encountered a WAL
+				 * location that was associated with a timestamp above, then
+				 * we wake up the receiver so that it notices the updated
+				 * lastReplayedEndRecPtr and sends a reply to the master.
+				 */
+				if (doRequestWalReceiverReply)
+				{
+					doRequestWalReceiverReply = false;
+					WalRcvWakeup();
+				}
+
 				/* Remember this record as the last-applied one */
 				LastRec = ReadRecPtr;
 
@@ -11583,3 +11657,103 @@ SetWalWriterSleeping(bool sleeping)
 	XLogCtl->WalWriterSleeping = sleeping;
 	SpinLockRelease(&XLogCtl->info_lck);
 }
+
+/*
+ * Called by redo code to indicate that the xlog replay loop should wake up
+ * the walreceiver process so that a reply can be sent to the primary.
+ */
+void
+XLogRequestWalReceiverReply(void)
+{
+	doRequestWalReceiverReply = true;
+}
+
+/*
+ * Merge timestamps from keepalive messages with the timestamps from WAL
+ * records, so that we can track lag while idle or while replaying large
+ * amounts of WAL without commit records.  In the former case there is no lag,
+ * and in the latter case we will remember a timestamp that goes with an
+ * arbitrary LSN, and wait for that LSN to be replayed before using the
+ * timestamp.
+ *
+ * This is called by walreceiver on standby servers when keepalive messages
+ * arrive.
+ */
+void
+SetXLogReplayTimestampAtLsn(TimestampTz timestamp, XLogRecPtr lsn)
+{
+	SpinLockAcquire(&XLogCtl->info_lck);
+	if (lsn == XLogCtl->lastReplayedEndRecPtr)
+	{
+		/*
+		 * That is the last replayed LSN: we are fully replayed, so we can
+		 * update the replay timestamp immediately.
+		 */
+		XLogCtl->lastReplayedTimestamp = timestamp;
+	}
+	else
+	{
+		/*
+		 * There is WAL still to be applied.  We will associate the timestamp
+		 * with this WAL position and wait for it to be replayed.  We add it
+		 * at the 'write' end of the circular buffer of LSN/timestamp
+		 * mappings, which the replay loop will eventually read.
+		 */
+		Index w = XLogCtl->timestampedLsnWrite;
+		Index r = XLogCtl->timestampedLsnRead;
+
+		XLogCtl->timestampedLsn[w] = lsn;
+		XLogCtl->timestampedLsnTime[w] = timestamp;
+
+		/* Advance the write point. */
+		w = (w + 1) % MAX_TIMESTAMPED_LSNS;
+		XLogCtl->timestampedLsnWrite = w;
+		if (w == r)
+		{
+			/*
+			 * The buffer is full.  Advance the read point (throwing away
+			 * oldest values; we will begin to overestimate replay lag, until
+			 * lag decreases to a size our buffer can manage, or the next
+			 * commit record is replayed).
+			 */
+			r = (r + 1) % MAX_TIMESTAMPED_LSNS;
+			XLogCtl->timestampedLsnRead = r;
+		}
+	}
+	SpinLockRelease(&XLogCtl->info_lck);
+}
+
+/*
+ * Set the timestamp for the most recently applied WAL record that carried a
+ * timestamp from the primary.  This can be called by redo handlers that have
+ * an appropriate timestamp (currently only commit records).  Updating the
+ * shared memory value is deferred until after the redo handler returns.
+ */
+void
+SetXLogReplayTimestamp(TimestampTz timestamp)
+{
+	LastReplayedTimestamp = timestamp;
+}
+
+/*
+ * Get the timestamp for the most recently applied WAL record that carried a
+ * timestamp from the master, and also the most recently applied LSN.  (Note
+ * that the timestamp and the LSN don't necessarily relate to the same
+ * record.)
+ *
+ * This is similar to GetLatestXTime, except that it is not only advanced by
+ * commit records (see SetXLogReplayTimestampAtLsn).
+ */
+TimestampTz
+GetXLogReplayTimestamp(XLogRecPtr *lsn)
+{
+	TimestampTz result;
+
+	SpinLockAcquire(&XLogCtl->info_lck);
+	if (lsn)
+		*lsn = XLogCtl->lastReplayedEndRecPtr;
+	result = XLogCtl->lastReplayedTimestamp;
+	SpinLockRelease(&XLogCtl->info_lck);
+
+	return result;
+}
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index ccc030f..f9b0e53 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -647,8 +647,10 @@ CREATE VIEW pg_stat_replication AS
             W.write_location,
             W.flush_location,
             W.replay_location,
+            W.replay_lag,
             W.sync_priority,
-            W.sync_state
+            W.sync_state,
+            W.causal_reads_state
     FROM pg_stat_get_activity(NULL) AS S, pg_authid U,
             pg_stat_get_wal_senders() AS W
     WHERE S.usesysid = U.oid AND
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index 325239d..4fc5fce 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -57,6 +57,11 @@
 #include "utils/builtins.h"
 #include "utils/ps_status.h"
 
+/* GUC variables */
+int causal_reads_timeout;
+bool causal_reads;
+char *causal_reads_standby_names;
+
 /* User-settable parameters for sync rep */
 char	   *SyncRepStandbyNames;
 
@@ -69,7 +74,7 @@ static int	SyncRepWaitMode = SYNC_REP_NO_WAIT;
 
 static void SyncRepQueueInsert(int mode);
 static void SyncRepCancelWait(void);
-static int	SyncRepWakeQueue(bool all, int mode);
+static int	SyncRepWakeQueue(bool all, int mode, XLogRecPtr lsn);
 
 static int	SyncRepGetStandbyPriority(void);
 
@@ -83,6 +88,239 @@ static bool SyncRepQueueIsOrderedByLSN(int mode);
  * ===========================================================
  */
 
+static bool
+SyncRepCheckEarlyExit(void)
+{
+	/*
+	 * If a wait for synchronous replication is pending, we can neither
+	 * acknowledge the commit nor raise ERROR or FATAL.  The latter would
+	 * lead the client to believe that that the transaction aborted, which
+	 * is not true: it's already committed locally. The former is no good
+	 * either: the client has requested synchronous replication, and is
+	 * entitled to assume that an acknowledged commit is also replicated,
+	 * which might not be true. So in this case we issue a WARNING (which
+	 * some clients may be able to interpret) and shut off further output.
+	 * We do NOT reset ProcDiePending, so that the process will die after
+	 * the commit is cleaned up.
+	 */
+	if (ProcDiePending)
+	{
+		ereport(WARNING,
+				(errcode(ERRCODE_ADMIN_SHUTDOWN),
+				 errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
+				 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
+		whereToSendOutput = DestNone;
+		SyncRepCancelWait();
+		return true;
+	}
+
+	/*
+	 * It's unclear what to do if a query cancel interrupt arrives.  We
+	 * can't actually abort at this point, but ignoring the interrupt
+	 * altogether is not helpful, so we just terminate the wait with a
+	 * suitable warning.
+	 */
+	if (QueryCancelPending)
+	{
+		QueryCancelPending = false;
+		ereport(WARNING,
+				(errmsg("canceling wait for synchronous replication due to user request"),
+				 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
+		SyncRepCancelWait();
+		return true;
+	}
+
+	/*
+	 * If the postmaster dies, we'll probably never get an
+	 * acknowledgement, because all the wal sender processes will exit. So
+	 * just bail out.
+	 */
+	if (!PostmasterIsAlive())
+	{
+		ProcDiePending = true;
+		whereToSendOutput = DestNone;
+		SyncRepCancelWait();
+		return true;
+	}
+
+	return false;
+}
+
+/*
+ * Check if we can stop waiting for causal consistency.  We can stop waiting
+ * when the following conditions are met:
+ *
+ * 1.  All walsenders currently in 'joining' or 'available' state have
+ * applied the target LSN.
+ *
+ * 2.  Any stall periods caused by standbys dropping out of 'available' state
+ * have passed, so that we can be sure that their leases have expired and they
+ * have started rejecting causal reads transactions.
+ *
+ * The output parameter 'waitingFor' is set to the number of nodes we are
+ * currently waiting for.  The output parameters 'stallTimeMillis' is set to
+ * the number of milliseconds we need to wait for to observe any current
+ * commit stall.
+ *
+ * Returns true if commit can return control, because every standby has either
+ * applied the LSN or started rejecting causal_reads transactions.
+ */
+static bool
+CausalReadsCommitCanReturn(XLogRecPtr XactCommitLSN,
+						   int *waitingFor,
+						   long *stallTimeMillis)
+{
+	int i;
+	TimestampTz now;
+
+	/* Count how many joining/available nodes we are waiting for. */
+	*waitingFor = 0;
+	for (i = 0; i < max_wal_senders; ++i)
+	{
+		WalSnd *walsnd = &WalSndCtl->walsnds[i];
+
+		/*
+		 * Assuming atomic read of pid_t, we can check walsnd->pid without
+		 * acquiring the spinlock to avoid memory synchronization costs for
+		 * unused walsender slots.  We see a value that existed sometime at
+		 * least as recently as the last memory barrier.
+		 */
+		if (walsnd->pid != 0)
+		{
+			/*
+			 * We need to hold the spinlock to read LSNs, because we can't be
+			 * sure they can be read atomically.
+			 */
+			SpinLockAcquire(&walsnd->mutex);
+			if (walsnd->pid != 0 && walsnd->causal_reads_state >= WALSNDCRSTATE_JOINING)
+			{
+				if (walsnd->apply < XactCommitLSN)
+					++*waitingFor;
+			}
+			SpinLockRelease(&walsnd->mutex);
+		}
+	}
+
+	/* Check if there is a stall in progress that we need to observe. */
+	now = GetCurrentTimestamp();
+	LWLockAcquire(SyncRepLock, LW_SHARED);
+	if (WalSndCtl->stall_causal_reads_until > now)
+	{
+		long seconds;
+		int usecs;
+
+		/* Compute how long we have to wait, rounded up to nearest ms. */
+		TimestampDifference(now, WalSndCtl->stall_causal_reads_until,
+							&seconds, &usecs);
+		*stallTimeMillis = seconds * 1000 + (usecs + 999) / 1000;
+	}
+	else
+		*stallTimeMillis = 0;
+	LWLockRelease(SyncRepLock);
+
+	/* We are done if we are not waiting for any nodes or stalls. */
+	return *waitingFor == 0 && *stallTimeMillis == 0;
+}
+
+/*
+ * Wait for causal consistency in causal_reads mode, if requested by user.
+ */
+void
+CausalReadsWaitForLSN(XLogRecPtr XactCommitLSN)
+{
+	long stallTimeMillis;
+	int waitingFor;
+
+	/* Leave if we aren't in causal_reads mode. */
+	if (!causal_reads)
+		return;
+
+	for (;;)
+	{
+		/* Reset latch before checking state. */
+		ResetLatch(MyLatch);
+
+		/*
+		 * Join the queue to be woken up if any causal reads joining/available
+		 * standby applies XactCommitLSN, if we aren't already in it.  We
+		 * don't actually know if we need to wait for any peers yet, but we
+		 * have to register just in case before checking the walsenders' state
+		 * to avoid a race condition that could occur if we did it after
+		 * calling CausalReadsCommitCanReturn.  (SyncRepWaitForLSN doesn't
+		 * have to do this because it can check the highest-seen LSN in
+		 * walsndctl->lsn[mode] which is protected by SyncRepLock, the same
+		 * lock as the queues.  We can't do that here, because there is no
+		 * single highest-seen LSN that is useful.  We must check
+		 * walsnd->apply for all relevant walsenders.  Therefore we must
+		 * register for notifications first, so that we can be notified via
+		 * our latch of any standby applying the LSN we're interested in after
+		 * we check but before we start waiting, or we could wait forever for
+		 * something that has already happened.)
+		 */
+		LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+		if (MyProc->syncRepState != SYNC_REP_WAITING)
+		{
+			MyProc->waitLSN = XactCommitLSN;
+			MyProc->syncRepState = SYNC_REP_WAITING;
+			SyncRepQueueInsert(SYNC_REP_WAIT_CAUSAL_READS_APPLY);
+			Assert(SyncRepQueueIsOrderedByLSN(SYNC_REP_WAIT_CAUSAL_READS_APPLY));
+		}
+		LWLockRelease(SyncRepLock);
+
+		/* Check if we're done. */
+		if (CausalReadsCommitCanReturn(XactCommitLSN, &waitingFor, &stallTimeMillis))
+		{
+			SyncRepCancelWait();
+			break;
+		}
+
+		Assert(waitingFor > 0 || stallTimeMillis > 0);
+
+		/* If we aren't actually waiting for any standbys, leave the queue. */
+		if (waitingFor == 0)
+			SyncRepCancelWait();
+
+		/* Update the ps title. */
+		if (update_process_title)
+		{
+			char buffer[80];
+
+			snprintf(buffer, sizeof(buffer),
+					 "waiting for %d peer(s) to apply %X/%X%s",
+					 waitingFor,
+					 (uint32) (XactCommitLSN >> 32), (uint32) XactCommitLSN,
+					 stallTimeMillis > 0 ? " (stalling)" : "");
+			set_ps_display(buffer, false);
+		}
+
+		/* Check if we need to exit early due to postmaster death etc. */
+		if (SyncRepCheckEarlyExit()) /* Calls SyncRepCancelWait() if true. */
+			break;
+
+		/*
+		 * If are still waiting for peers, then we wait for any joining or
+		 * available peer to reach the LSN (or possibly stop being in one of
+		 * those states or go away).
+		 *
+		 * If not, there must be a non-zero stall time, so we wait for that to
+		 * elapse.
+		 */
+		if (waitingFor > 0)
+			WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1);
+		else
+			WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT,
+					  stallTimeMillis);
+	}
+
+	/* There is no way out of the loop that could leave us in the queue. */
+	Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
+	MyProc->syncRepState = SYNC_REP_NOT_WAITING;
+	MyProc->waitLSN = 0;
+
+	if (update_process_title)
+		set_ps_display("", false); /* TODO: restore what was there */
+}
+
 /*
  * Wait for synchronous replication, if requested by user.
  *
@@ -180,57 +418,9 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
 		if (syncRepState == SYNC_REP_WAIT_COMPLETE)
 			break;
 
-		/*
-		 * If a wait for synchronous replication is pending, we can neither
-		 * acknowledge the commit nor raise ERROR or FATAL.  The latter would
-		 * lead the client to believe that that the transaction aborted, which
-		 * is not true: it's already committed locally. The former is no good
-		 * either: the client has requested synchronous replication, and is
-		 * entitled to assume that an acknowledged commit is also replicated,
-		 * which might not be true. So in this case we issue a WARNING (which
-		 * some clients may be able to interpret) and shut off further output.
-		 * We do NOT reset ProcDiePending, so that the process will die after
-		 * the commit is cleaned up.
-		 */
-		if (ProcDiePending)
-		{
-			ereport(WARNING,
-					(errcode(ERRCODE_ADMIN_SHUTDOWN),
-					 errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
-					 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
-			whereToSendOutput = DestNone;
-			SyncRepCancelWait();
-			break;
-		}
-
-		/*
-		 * It's unclear what to do if a query cancel interrupt arrives.  We
-		 * can't actually abort at this point, but ignoring the interrupt
-		 * altogether is not helpful, so we just terminate the wait with a
-		 * suitable warning.
-		 */
-		if (QueryCancelPending)
-		{
-			QueryCancelPending = false;
-			ereport(WARNING,
-					(errmsg("canceling wait for synchronous replication due to user request"),
-					 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
-			SyncRepCancelWait();
-			break;
-		}
-
-		/*
-		 * If the postmaster dies, we'll probably never get an
-		 * acknowledgement, because all the wal sender processes will exit. So
-		 * just bail out.
-		 */
-		if (!PostmasterIsAlive())
-		{
-			ProcDiePending = true;
-			whereToSendOutput = DestNone;
-			SyncRepCancelWait();
+		/* Check if we need to exit early due to postmaster death etc. */
+		if (SyncRepCheckEarlyExit())
 			break;
-		}
 
 		/*
 		 * Wait on latch.  Any condition that should wake us up will set the
@@ -403,6 +593,49 @@ SyncRepGetSynchronousStandby(void)
 }
 
 /*
+ * Check if the current WALSender process's application_name matches a name in
+ * causal_reads_standby_names (including '*' for wildcard).
+ */
+bool
+CausalReadsPotentialStandby(void)
+{
+	char *rawstring;
+	List	   *elemlist;
+	ListCell   *l;
+	bool		found = false;
+
+	/* Need a modifiable copy of string */
+	rawstring = pstrdup(causal_reads_standby_names);
+
+	/* Parse string into list of identifiers */
+	if (!SplitIdentifierString(rawstring, ',', &elemlist))
+	{
+		/* syntax error in list */
+		pfree(rawstring);
+		list_free(elemlist);
+		/* GUC machinery will have already complained - no need to do again */
+		return 0;
+	}
+
+	foreach(l, elemlist)
+	{
+		char	   *standby_name = (char *) lfirst(l);
+
+		if (pg_strcasecmp(standby_name, application_name) == 0 ||
+			pg_strcasecmp(standby_name, "*") == 0)
+		{
+			found = true;
+			break;
+		}
+	}
+
+	pfree(rawstring);
+	list_free(elemlist);
+
+	return found;
+}
+
+/*
  * Update the LSNs on each queue based upon our latest state. This
  * implements a simple policy of first-valid-standby-releases-waiter.
  *
@@ -410,22 +643,27 @@ SyncRepGetSynchronousStandby(void)
  * perhaps also which information we store as well.
  */
 void
-SyncRepReleaseWaiters(void)
+SyncRepReleaseWaiters(bool walsender_cr_available_or_joining)
 {
 	volatile WalSndCtlData *walsndctl = WalSndCtl;
 	WalSnd	   *syncWalSnd;
 	int			numwrite = 0;
 	int			numflush = 0;
+	int			numapply = 0;
+	int			numcausalreadsapply = 0;
+	bool		is_highest_priority_sync_standby;
 
 	/*
 	 * If this WALSender is serving a standby that is not on the list of
-	 * potential standbys then we have nothing to do. If we are still starting
-	 * up, still running base backup or the current flush position is still
-	 * invalid, then leave quickly also.
+	 * potential standbys and not in a state that causal_reads waits for, then
+	 * we have nothing to do. If we are still starting up, still running base
+	 * backup or the current flush position is still invalid, then leave
+	 * quickly also.
 	 */
-	if (MyWalSnd->sync_standby_priority == 0 ||
-		MyWalSnd->state < WALSNDSTATE_STREAMING ||
-		XLogRecPtrIsInvalid(MyWalSnd->flush))
+	if (!walsender_cr_available_or_joining &&
+		(MyWalSnd->sync_standby_priority == 0 ||
+		 MyWalSnd->state < WALSNDSTATE_STREAMING ||
+		 XLogRecPtrIsInvalid(MyWalSnd->flush)))
 		return;
 
 	/*
@@ -435,45 +673,77 @@ SyncRepReleaseWaiters(void)
 	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
 	syncWalSnd = SyncRepGetSynchronousStandby();
 
-	/* We should have found ourselves at least */
-	Assert(syncWalSnd != NULL);
+	/*
+	 * If we aren't managing the highest priority standby then make a note of
+	 * that so we can announce a takeover in the log if we ever get that job.
+	 */
+	is_highest_priority_sync_standby = syncWalSnd == MyWalSnd;
+	if (!is_highest_priority_sync_standby)
+		announce_next_takeover = true;
 
 	/*
-	 * If we aren't managing the highest priority standby then just leave.
+	 * If we aren't managing the highest priority standby or a standby in
+	 * causal reads 'joining' or 'available' state, then just leave.
 	 */
-	if (syncWalSnd != MyWalSnd)
+	if (!is_highest_priority_sync_standby && !walsender_cr_available_or_joining)
 	{
 		LWLockRelease(SyncRepLock);
-		announce_next_takeover = true;
 		return;
 	}
 
 	/*
 	 * Set the lsn first so that when we wake backends they will release up to
-	 * this location.
+	 * this location.  For the single-standby synchronous commit levels, we
+	 * only do this if we are the current synchronous standby and we are
+	 * advancing the LSN further than it has been advanced before, so that
+	 * SyncRepWaitForLSN can skip waiting in some cases.
 	 */
-	if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < MyWalSnd->write)
+	if (is_highest_priority_sync_standby)
 	{
-		walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write;
-		numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
-	}
-	if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < MyWalSnd->flush)
-	{
-		walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush;
-		numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
+		if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < MyWalSnd->write)
+		{
+			walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write;
+			numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE,
+										MyWalSnd->write);
+		}
+		if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < MyWalSnd->write)
+		{
+			walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush;
+			numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH,
+										MyWalSnd->flush);
+		}
+		if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < MyWalSnd->apply)
+		{
+			walsndctl->lsn[SYNC_REP_WAIT_APPLY] = MyWalSnd->apply;
+			numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY,
+										MyWalSnd->apply);
+		}
 	}
+	/*
+	 * For causal_reads, all walsenders currently in available or joining
+	 * state must reach the LSN on their own, and standbys will reach LSNs in
+	 * any order.  It doesn't make sense to keep the highest seen LSN in a
+	 * single walsndctl->lsn element.  (CausalReadsWaitForLSN has handling for
+	 * LSNs that have already been reached).
+	 */
+	if (walsender_cr_available_or_joining)
+		numcausalreadsapply =
+			SyncRepWakeQueue(false, SYNC_REP_WAIT_CAUSAL_READS_APPLY,
+							 MyWalSnd->apply);
 
 	LWLockRelease(SyncRepLock);
 
-	elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X",
+	elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X, %d procs to causal_reads apply",
 		 numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write,
-	   numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush);
+		 numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush,
+		 numapply, (uint32) (MyWalSnd->apply >> 32), (uint32) MyWalSnd->apply,
+		 numcausalreadsapply);
 
 	/*
 	 * If we are managing the highest priority standby, though we weren't
 	 * prior to this, then announce we are now the sync standby.
 	 */
-	if (announce_next_takeover)
+	if (is_highest_priority_sync_standby && announce_next_takeover)
 	{
 		announce_next_takeover = false;
 		ereport(LOG,
@@ -548,9 +818,8 @@ SyncRepGetStandbyPriority(void)
  * Must hold SyncRepLock.
  */
 static int
-SyncRepWakeQueue(bool all, int mode)
+SyncRepWakeQueue(bool all, int mode, XLogRecPtr lsn)
 {
-	volatile WalSndCtlData *walsndctl = WalSndCtl;
 	PGPROC	   *proc = NULL;
 	PGPROC	   *thisproc = NULL;
 	int			numprocs = 0;
@@ -567,7 +836,7 @@ SyncRepWakeQueue(bool all, int mode)
 		/*
 		 * Assume the queue is ordered by LSN
 		 */
-		if (!all && walsndctl->lsn[mode] < proc->waitLSN)
+		if (!all && lsn < proc->waitLSN)
 			return numprocs;
 
 		/*
@@ -627,7 +896,7 @@ SyncRepUpdateSyncStandbysDefined(void)
 			int			i;
 
 			for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
-				SyncRepWakeQueue(true, i);
+				SyncRepWakeQueue(true, i, InvalidXLogRecPtr);
 		}
 
 		/*
@@ -679,13 +948,31 @@ SyncRepQueueIsOrderedByLSN(int mode)
 #endif
 
 /*
+ * Make sure that CausalReadsWaitForLSN can't return until after the given
+ * lease expiry time has been reached.
+ *
+ * Wake up all backends waiting in CausalReadsWaitForLSN, because the set of
+ * available/joining peers has changed, and there is a new stall time they
+ * need to observe.
+ */
+void
+CausalReadsBeginStall(TimestampTz lease_expiry_time)
+{
+	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+	WalSndCtl->stall_causal_reads_until =
+		Max(WalSndCtl->stall_causal_reads_until, lease_expiry_time);
+	SyncRepWakeQueue(true, SYNC_REP_WAIT_CAUSAL_READS_APPLY, InvalidXLogRecPtr);
+	LWLockRelease(SyncRepLock);
+}
+
+/*
  * ===========================================================
  * Synchronous Replication functions executed by any process
  * ===========================================================
  */
 
 bool
-check_synchronous_standby_names(char **newval, void **extra, GucSource source)
+check_standby_names(char **newval, void **extra, GucSource source)
 {
 	char	   *rawstring;
 	List	   *elemlist;
@@ -728,6 +1015,9 @@ assign_synchronous_commit(int newval, void *extra)
 		case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
 			SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
 			break;
+		case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
+			SyncRepWaitMode = SYNC_REP_WAIT_APPLY;
+			break;
 		default:
 			SyncRepWaitMode = SYNC_REP_NO_WAIT;
 			break;
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 183a3a5..ab61b3f 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -52,6 +52,7 @@
 #include "libpq/pqformat.h"
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
+#include "replication/syncrep.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "storage/ipc.h"
@@ -96,6 +97,7 @@ static uint32 recvOff = 0;
  */
 static volatile sig_atomic_t got_SIGHUP = false;
 static volatile sig_atomic_t got_SIGTERM = false;
+static volatile sig_atomic_t got_SIGUSR2 = false;
 
 /*
  * LogstreamResult indicates the byte positions that we have already
@@ -140,14 +142,33 @@ static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
 static void XLogWalRcvFlush(bool dying);
 static void XLogWalRcvSendReply(bool force, bool requestReply);
 static void XLogWalRcvSendHSFeedback(bool immed);
-static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
+static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime,
+								  TimestampTz *causalReadsUntil);
 
 /* Signal handlers */
 static void WalRcvSigHupHandler(SIGNAL_ARGS);
 static void WalRcvSigUsr1Handler(SIGNAL_ARGS);
+static void WalRcvSigUsr2Handler(SIGNAL_ARGS);
 static void WalRcvShutdownHandler(SIGNAL_ARGS);
 static void WalRcvQuickDieHandler(SIGNAL_ARGS);
 
+static void WalRcvBlockSigUsr2(void)
+{
+	sigset_t mask;
+
+	sigemptyset(&mask);
+	sigaddset(&mask, SIGUSR2);
+	sigprocmask(SIG_BLOCK, &mask, NULL);
+}
+
+static void WalRcvUnblockSigUsr2(void)
+{
+	sigset_t mask;
+
+	sigemptyset(&mask);
+	sigaddset(&mask, SIGUSR2);
+	sigprocmask(SIG_UNBLOCK, &mask, NULL);
+}
 
 static void
 ProcessWalRcvInterrupts(void)
@@ -195,6 +216,7 @@ WalReceiverMain(void)
 	WalRcvData *walrcv = WalRcv;
 	TimestampTz last_recv_timestamp;
 	bool		ping_sent;
+	bool		forceReply;
 
 	/*
 	 * WalRcv should be set up already (if we are a backend, we inherit this
@@ -246,6 +268,7 @@ WalReceiverMain(void)
 
 	/* Initialise to a sanish value */
 	walrcv->lastMsgSendTime = walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = GetCurrentTimestamp();
+	walrcv->causalReadsLease = 0;
 
 	SpinLockRelease(&walrcv->mutex);
 
@@ -263,7 +286,7 @@ WalReceiverMain(void)
 	pqsignal(SIGALRM, SIG_IGN);
 	pqsignal(SIGPIPE, SIG_IGN);
 	pqsignal(SIGUSR1, WalRcvSigUsr1Handler);
-	pqsignal(SIGUSR2, SIG_IGN);
+	pqsignal(SIGUSR2, WalRcvSigUsr2Handler);
 
 	/* Reset some signals that are accepted by postmaster but not here */
 	pqsignal(SIGCHLD, SIG_DFL);
@@ -294,6 +317,10 @@ WalReceiverMain(void)
 	/* Unblock signals (they were blocked when the postmaster forked us) */
 	PG_SETMASK(&UnBlockSig);
 
+	/* Block SIGUSR2 (we unblock it only during network waits). */
+	WalRcvBlockSigUsr2();
+	got_SIGUSR2 = false;
+
 	/* Establish the connection to the primary for XLOG streaming */
 	EnableWalRcvImmediateExit();
 	walrcv_connect(conninfo);
@@ -403,7 +430,9 @@ WalReceiverMain(void)
 				}
 
 				/* Wait a while for data to arrive */
+				WalRcvUnblockSigUsr2();
 				len = walrcv_receive(NAPTIME_PER_CYCLE, &buf);
+				WalRcvBlockSigUsr2();
 				if (len != 0)
 				{
 					/*
@@ -434,11 +463,21 @@ WalReceiverMain(void)
 							endofwal = true;
 							break;
 						}
+						WalRcvUnblockSigUsr2();
 						len = walrcv_receive(0, &buf);
+						WalRcvBlockSigUsr2();
+					}
+
+					if (got_SIGUSR2)
+					{
+						/* The recovery process asked us to force a reply. */
+						got_SIGUSR2 = false;
+						forceReply = true;
 					}
 
 					/* Let the master know that we received some data. */
-					XLogWalRcvSendReply(false, false);
+					XLogWalRcvSendReply(forceReply, false);
+					forceReply = false;
 
 					/*
 					 * If we've written some records, flush them to disk and
@@ -493,7 +532,15 @@ WalReceiverMain(void)
 						}
 					}
 
-					XLogWalRcvSendReply(requestReply, requestReply);
+					/* Check if the startup process has signaled us. */
+					if (got_SIGUSR2)
+					{
+						got_SIGUSR2 = false;
+						forceReply = true;
+					}
+
+					XLogWalRcvSendReply(requestReply || forceReply, requestReply);
+					forceReply = false;
 					XLogWalRcvSendHSFeedback(false);
 				}
 			}
@@ -735,6 +782,13 @@ WalRcvSigUsr1Handler(SIGNAL_ARGS)
 	errno = save_errno;
 }
 
+/* SIGUSR2: used to receive wakeups from recovery */
+static void
+WalRcvSigUsr2Handler(SIGNAL_ARGS)
+{
+	got_SIGUSR2 = true;
+}
+
 /* SIGTERM: set flag for main loop, or shutdown immediately if safe */
 static void
 WalRcvShutdownHandler(SIGNAL_ARGS)
@@ -795,6 +849,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
 	XLogRecPtr	walEnd;
 	TimestampTz sendTime;
 	bool		replyRequested;
+	TimestampTz causalReadsLease;
 
 	resetStringInfo(&incoming_message);
 
@@ -815,7 +870,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
 				walEnd = pq_getmsgint64(&incoming_message);
 				sendTime = IntegerTimestampToTimestampTz(
 										  pq_getmsgint64(&incoming_message));
-				ProcessWalSndrMessage(walEnd, sendTime);
+				ProcessWalSndrMessage(walEnd, sendTime, NULL);
 
 				buf += hdrlen;
 				len -= hdrlen;
@@ -825,7 +880,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
 		case 'k':				/* Keepalive */
 			{
 				/* copy message to StringInfo */
-				hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
+				hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char) + sizeof(int64);
 				if (len != hdrlen)
 					ereport(ERROR,
 							(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -837,8 +892,12 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
 				sendTime = IntegerTimestampToTimestampTz(
 										  pq_getmsgint64(&incoming_message));
 				replyRequested = pq_getmsgbyte(&incoming_message);
+				causalReadsLease = IntegerTimestampToTimestampTz(
+					pq_getmsgint64(&incoming_message));
+				ProcessWalSndrMessage(walEnd, sendTime, &causalReadsLease);
 
-				ProcessWalSndrMessage(walEnd, sendTime);
+				/* Remember primary's timestamp at this WAL location. */
+				SetXLogReplayTimestampAtLsn(sendTime, walEnd);
 
 				/* If the primary requested a reply, send one immediately */
 				if (replyRequested)
@@ -1032,6 +1091,7 @@ XLogWalRcvSendReply(bool force, bool requestReply)
 	XLogRecPtr	applyPtr;
 	static TimestampTz sendTime = 0;
 	TimestampTz now;
+	TimestampTz applyTimestamp = 0;
 
 	/*
 	 * If the user doesn't want status to be reported to the master, be sure
@@ -1063,7 +1123,7 @@ XLogWalRcvSendReply(bool force, bool requestReply)
 	/* Construct a new message */
 	writePtr = LogstreamResult.Write;
 	flushPtr = LogstreamResult.Flush;
-	applyPtr = GetXLogReplayRecPtr(NULL);
+	applyTimestamp = GetXLogReplayTimestamp(&applyPtr);
 
 	resetStringInfo(&reply_message);
 	pq_sendbyte(&reply_message, 'r');
@@ -1071,6 +1131,7 @@ XLogWalRcvSendReply(bool force, bool requestReply)
 	pq_sendint64(&reply_message, flushPtr);
 	pq_sendint64(&reply_message, applyPtr);
 	pq_sendint64(&reply_message, GetCurrentIntegerTimestamp());
+	pq_sendint64(&reply_message, TimestampTzToIntegerTimestamp(applyTimestamp));
 	pq_sendbyte(&reply_message, requestReply ? 1 : 0);
 
 	/* Send it */
@@ -1169,15 +1230,52 @@ XLogWalRcvSendHSFeedback(bool immed)
  * Update shared memory status upon receiving a message from primary.
  *
  * 'walEnd' and 'sendTime' are the end-of-WAL and timestamp of the latest
- * message, reported by primary.
+ * message, reported by primary.  'causalReadsLease' is a pointer to
+ * the time the primary promises that this standby can safely claim to be
+ * causally consistent, to 0 if it cannot, or a NULL pointer for no change.
  */
 static void
-ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
+ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime,
+					  TimestampTz *causalReadsLease)
 {
 	WalRcvData *walrcv = WalRcv;
 
 	TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
 
+	/* Sanity check for the causalReadsLease time. */
+	if (causalReadsLease != NULL && *causalReadsLease != 0)
+	{
+		/* Deduce max_clock_skew from the causalReadsLease and sendTime. */
+#ifdef HAVE_INT64_TIMESTAMP
+		int64 diffMillis = (*causalReadsLease - sendTime) / 1000;
+#else
+		int64 diffMillis = (*causalReadsLease - sendTime) * 1000;
+#endif
+		int64 max_clock_skew = diffMillis / (CAUSAL_READS_CLOCK_SKEW_RATIO - 1);
+
+		if (sendTime > TimestampTzPlusMilliseconds(lastMsgReceiptTime, max_clock_skew))
+		{
+			/*
+			 * The primary's clock is more than max_clock_skew + network
+			 * latency ahead of the standby's clock.  (If the primary's clock
+			 * is more than max_clock_skew ahead of the standby's clock, but
+			 * by less than the network latency, then there isn't much we can
+			 * do to detect that; but it still seems useful to have this basic
+			 * sanity check for wildly misconfigured servers.)
+			 */
+			elog(LOG, "the primary server's clock time is too far ahead");
+			causalReadsLease = NULL;
+		}
+		/*
+		 * We could also try to detect cases where sendTime is more than
+		 * max_clock_skew in the past according to the standby's clock, but
+		 * that is indistinguishable from network latency/buffering, so we
+		 * could produce misleading error messages; if we do nothing, the
+		 * consequence is 'standby is not available for causal reads' errors
+		 * which should cause the user to investigate.
+		 */
+	}
+
 	/* Update shared-memory status */
 	SpinLockAcquire(&walrcv->mutex);
 	if (walrcv->latestWalEnd < walEnd)
@@ -1185,6 +1283,8 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
 	walrcv->latestWalEnd = walEnd;
 	walrcv->lastMsgSendTime = sendTime;
 	walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
+	if (causalReadsLease != NULL)
+		walrcv->causalReadsLease = *causalReadsLease;
 	SpinLockRelease(&walrcv->mutex);
 
 	if (log_min_messages <= DEBUG2)
@@ -1215,3 +1315,23 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
 		pfree(receipttime);
 	}
 }
+
+/*
+ * Wake up the walreceiver if it happens to be blocked in walrcv_receive,
+ * and tell it that a commit record has been applied.
+ *
+ * This is called by the startup process whenever interesting xlog records
+ * are applied, so that walreceiver can check if it needs to send an apply
+ * notification back to the master which may be waiting in a COMMIT with
+ * synchronous_commit = apply or causal_reads = on.
+ *
+ * TODO: This may change -- Simon Riggs suggested latches for this.  Maybe
+ * pipes would work too (and avoid interrupting systems calls and allow for
+ * multiplexed IO with the replication socket).
+ */
+void
+WalRcvWakeup(void)
+{
+	if (WalRcv->pid != 0)
+		kill(WalRcv->pid, SIGUSR2);
+}
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index 4452f25..a76d4da 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -25,9 +25,11 @@
 
 #include "access/xlog_internal.h"
 #include "postmaster/startup.h"
+#include "replication/syncrep.h"
 #include "replication/walreceiver.h"
 #include "storage/pmsignal.h"
 #include "storage/shmem.h"
+#include "utils/guc.h"
 #include "utils/timestamp.h"
 
 WalRcvData *WalRcv = NULL;
@@ -374,3 +376,21 @@ GetReplicationTransferLatency(void)
 
 	return ms;
 }
+
+/*
+ * Used by snapmgr to check if this standby has a valid lease, granting it the
+ * right to consider itself available for causal reads.
+ */
+bool
+WalRcvCausalReadsAvailable(void)
+{
+	WalRcvData *walrcv = WalRcv;
+	TimestampTz now = GetCurrentTimestamp();
+	bool result;
+
+	SpinLockAcquire(&walrcv->mutex);
+	result = walrcv->causalReadsLease != 0 && now <= walrcv->causalReadsLease;
+	SpinLockRelease(&walrcv->mutex);
+
+	return result;
+}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 4a4643e..8405e40 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -153,9 +153,20 @@ static StringInfoData tmpbuf;
  */
 static TimestampTz last_reply_timestamp = 0;
 
+static TimestampTz last_keepalive_timestamp = 0;
+
 /* Have we sent a heartbeat message asking for reply, since last reply? */
 static bool waiting_for_ping_response = false;
 
+/* How long do need to stay in JOINING state? */
+static TimestampTz causal_reads_joining_until = 0;
+
+/* The last causal reads lease sent to the standby. */
+static TimestampTz causal_reads_last_lease = 0;
+
+/* Is this WALSender listed in causal_reads_standby_names? */
+static bool am_potential_causal_reads_standby = false;
+
 /*
  * While streaming WAL in Copy mode, streamingDoneSending is set to true
  * after we have sent CopyDone. We should not send any more CopyData messages
@@ -242,6 +253,57 @@ InitWalSender(void)
 }
 
 /*
+ * If we are exiting unexpectedly, we may need to communicate with concurrent
+ * causal_reads commits to maintain the causal consistency guarantee.
+ */
+static void
+PrepareUncleanExit(void)
+{
+	if (MyWalSnd->causal_reads_state == WALSNDCRSTATE_AVAILABLE)
+	{
+		/*
+		 * We've lost contact with the standby, but it may still be alive.  We
+		 * can't let any causal_reads transactions return until we've stalled
+		 * for long enough for a zombie standby to start raising errors
+		 * because its lease has expired.
+		 */
+		elog(LOG, "standby \"%s\" is lost (no longer available for causal reads)", application_name);
+		CausalReadsBeginStall(causal_reads_last_lease);
+
+		/*
+		 * We set the state to a lower level _after_ beginning the stall,
+		 * otherwise there would be a tiny window where commits could return
+		 * without observing the stall.
+		 */
+		SpinLockAcquire(&MyWalSnd->mutex);
+		MyWalSnd->causal_reads_state = WALSNDCRSTATE_UNAVAILABLE;
+		SpinLockRelease(&MyWalSnd->mutex);
+	}
+}
+
+/*
+ * We are shutting down because we received a goodbye message from the
+ * walreceiver.
+ */
+static void
+PrepareCleanExit(void)
+{
+	if (MyWalSnd->causal_reads_state == WALSNDCRSTATE_AVAILABLE)
+	{
+		/*
+		 * The standby is shutting down, so it won't be running any more
+		 * transactions.  It is therefore safe to stop waiting for it, and no
+		 * stall is necessary.
+		 */
+		elog(LOG, "standby \"%s\" is leaving (no longer available for causal reads)", application_name);
+
+		SpinLockAcquire(&MyWalSnd->mutex);
+		MyWalSnd->causal_reads_state = WALSNDCRSTATE_UNAVAILABLE;
+		SpinLockRelease(&MyWalSnd->mutex);
+	}
+}
+
+/*
  * Clean up after an error.
  *
  * WAL sender processes don't use transactions like regular backends do.
@@ -264,7 +326,10 @@ WalSndErrorCleanup(void)
 
 	replication_active = false;
 	if (walsender_ready_to_stop)
+	{
+		PrepareUncleanExit();
 		proc_exit(0);
+	}
 
 	/* Revert back to startup state */
 	WalSndSetState(WALSNDSTATE_STARTUP);
@@ -276,6 +341,8 @@ WalSndErrorCleanup(void)
 static void
 WalSndShutdown(void)
 {
+	PrepareUncleanExit();
+
 	/*
 	 * Reset whereToSendOutput to prevent ereport from attempting to send any
 	 * more messages to the standby.
@@ -1386,6 +1453,7 @@ ProcessRepliesIfAny(void)
 		if (r < 0)
 		{
 			/* unexpected error or EOF */
+			PrepareUncleanExit();
 			ereport(COMMERROR,
 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
 					 errmsg("unexpected EOF on standby connection")));
@@ -1402,6 +1470,7 @@ ProcessRepliesIfAny(void)
 		resetStringInfo(&reply_message);
 		if (pq_getmessage(&reply_message, 0))
 		{
+			PrepareUncleanExit();
 			ereport(COMMERROR,
 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
 					 errmsg("unexpected EOF on standby connection")));
@@ -1451,6 +1520,7 @@ ProcessRepliesIfAny(void)
 				 * 'X' means that the standby is closing down the socket.
 				 */
 			case 'X':
+				PrepareCleanExit();
 				proc_exit(0);
 
 			default:
@@ -1543,15 +1613,29 @@ ProcessStandbyReplyMessage(void)
 	XLogRecPtr	writePtr,
 				flushPtr,
 				applyPtr;
+	int			applyLagMs;
 	bool		replyRequested;
+	TimestampTz now = GetCurrentTimestamp();
+	TimestampTz applyTimestamp;
 
 	/* the caller already consumed the msgtype byte */
 	writePtr = pq_getmsgint64(&reply_message);
 	flushPtr = pq_getmsgint64(&reply_message);
 	applyPtr = pq_getmsgint64(&reply_message);
 	(void) pq_getmsgint64(&reply_message);		/* sendTime; not used ATM */
+	applyTimestamp = IntegerTimestampToTimestampTz(pq_getmsgint64(&reply_message));
 	replyRequested = pq_getmsgbyte(&reply_message);
 
+	/* Compute the apply lag in milliseconds. */
+	if (applyTimestamp == 0)
+		applyLagMs = -1;
+	else
+#ifdef HAVE_INT64_TIMESTAMP
+		applyLagMs = (now - applyTimestamp) / 1000;
+#else
+		applyLagMs = (now - applyTimestamp) * 1000.0;
+#endif
+
 	elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
 		 (uint32) (writePtr >> 32), (uint32) writePtr,
 		 (uint32) (flushPtr >> 32), (uint32) flushPtr,
@@ -1568,16 +1652,91 @@ ProcessStandbyReplyMessage(void)
 	 */
 	{
 		WalSnd *walsnd = MyWalSnd;
+		WalSndCausalReadsState causal_reads_state = walsnd->causal_reads_state;
+		bool causal_reads_state_changed = false;
+
+		/*
+		 * Handle causal reads state transitions, if a causal_reads_timeout is
+		 * configured, this standby is listed in causal_reads_standby_names,
+		 * and we are a primary database (not a cascading standby).
+		 */
+		if (causal_reads_timeout != 0 &&
+			am_potential_causal_reads_standby &&
+			!am_cascading_walsender)
+		{
+			if (applyLagMs >= 0 && applyLagMs < causal_reads_timeout)
+			{
+				if (causal_reads_state == WALSNDCRSTATE_UNAVAILABLE)
+				{
+					causal_reads_state = WALSNDCRSTATE_JOINING;
+					causal_reads_joining_until =
+						TimestampTzPlusMilliseconds(now, causal_reads_timeout);
+					causal_reads_state_changed = true;
+				}
+				else if (causal_reads_state == WALSNDCRSTATE_JOINING &&
+						 now >= causal_reads_joining_until)
+				{
+					causal_reads_state = WALSNDCRSTATE_AVAILABLE;
+					causal_reads_state_changed = true;
+				}
+			}
+			else
+			{
+				if (causal_reads_state == WALSNDCRSTATE_AVAILABLE)
+				{
+					causal_reads_state = WALSNDCRSTATE_UNAVAILABLE;
+					causal_reads_state_changed = true;
+					/*
+					 * We are dropping a causal reads available standby, so we
+					 * mustn't let any commit command that is waiting in
+					 * CausalReadsWaitForLSN return until we are sure that the
+					 * standby definitely knows that it's not available and
+					 * starts raising errors for causal_reads transactions.
+					 * TODO: We could just wait until the standby acks that
+					 * its lease has been cancelled, and start numbering
+					 * keepalives and sending the number back in replies, so
+					 * we know it's acking the right message; then lagging
+					 * standbys would be less disruptive, but for now we just
+					 * wait for the lease to expire, as we do when we lose
+					 * contact with a standby, for the sake of simplicity.
+					 */
+					CausalReadsBeginStall(causal_reads_last_lease);
+				}
+				else if (causal_reads_state == WALSNDCRSTATE_JOINING)
+				{
+					/*
+					 * Dropping a joining standby doesn't require a stall,
+					 * because the standby doesn't think it's available, so
+					 * it's already raising the error for causal_reads
+					 * transactions.
+					 */
+					causal_reads_state = WALSNDCRSTATE_UNAVAILABLE;
+					causal_reads_state_changed = true;
+				}
+			}
+		}
 
 		SpinLockAcquire(&walsnd->mutex);
 		walsnd->write = writePtr;
 		walsnd->flush = flushPtr;
 		walsnd->apply = applyPtr;
+		walsnd->applyLagMs = applyLagMs;
+		walsnd->causal_reads_state = causal_reads_state;
 		SpinLockRelease(&walsnd->mutex);
+
+		if (causal_reads_state_changed)
+		{
+			WalSndKeepalive(true);
+			elog(LOG, "standby \"%s\" is %s", application_name,
+				 causal_reads_state == WALSNDCRSTATE_UNAVAILABLE ? "unavailable for causal reads" :
+				 causal_reads_state == WALSNDCRSTATE_JOINING ? "joining as a causal reads standby..." :
+				 causal_reads_state == WALSNDCRSTATE_AVAILABLE ? "available for causal reads" :
+				 "UNKNOWN");
+		}
 	}
 
 	if (!am_cascading_walsender)
-		SyncRepReleaseWaiters();
+		SyncRepReleaseWaiters(MyWalSnd->causal_reads_state >= WALSNDCRSTATE_JOINING);
 
 	/*
 	 * Advance our local xmin horizon when the client confirmed a flush.
@@ -1724,27 +1883,34 @@ WalSndComputeSleeptime(TimestampTz now)
 {
 	long		sleeptime = 10000;		/* 10 s */
 
-	if (wal_sender_timeout > 0 && last_reply_timestamp > 0)
+	if ((wal_sender_timeout > 0 || causal_reads_timeout > 0) && last_reply_timestamp > 0)
 	{
 		TimestampTz wakeup_time;
 		long		sec_to_timeout;
 		int			microsec_to_timeout;
 
-		/*
-		 * At the latest stop sleeping once wal_sender_timeout has been
-		 * reached.
-		 */
-		wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
-												  wal_sender_timeout);
-
-		/*
-		 * If no ping has been sent yet, wakeup when it's time to do so.
-		 * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
-		 * the timeout passed without a response.
-		 */
-		if (!waiting_for_ping_response)
+		if (causal_reads_timeout != 0)
+			wakeup_time = TimestampTzPlusMilliseconds(last_keepalive_timestamp,
+													  causal_reads_timeout /
+													  CAUSAL_READS_KEEPALIVE_RATIO);
+		else
+		{
+			/*
+			 * At the latest stop sleeping once wal_sender_timeout has been
+			 * reached.
+			 */
 			wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
-													  wal_sender_timeout / 2);
+													  wal_sender_timeout);
+
+			/*
+			 * If no ping has been sent yet, wakeup when it's time to do so.
+			 * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
+			 * the timeout passed without a response.
+			 */
+			if (!waiting_for_ping_response)
+				wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
+														  wal_sender_timeout / 2);
+		}
 
 		/* Compute relative time until wakeup. */
 		TimestampDifference(now, wakeup_time,
@@ -1765,15 +1931,28 @@ static void
 WalSndCheckTimeOut(TimestampTz now)
 {
 	TimestampTz timeout;
+	int allowed_time;
 
 	/* don't bail out if we're doing something that doesn't require timeouts */
 	if (last_reply_timestamp <= 0)
 		return;
 
+	/*
+	 * If a causal_reads_timeout is configured, it is used instead of
+	 * wal_sender_timeout.  Ideally we'd use causal_reads_timeout / 2 +
+	 * allowance for network latency, but since walreceiver can become quite
+	 * bogged down fsyncing WAL we allow more tolerance.  (This could be
+	 * tightened up once standbys hand writing off to the WAL writer).
+	 */
+	if (causal_reads_timeout != 0)
+		allowed_time = causal_reads_timeout;
+	else
+		allowed_time = wal_sender_timeout;
+
 	timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
-										  wal_sender_timeout);
+										  allowed_time);
 
-	if (wal_sender_timeout > 0 && now >= timeout)
+	if (allowed_time > 0 && now >= timeout)
 	{
 		/*
 		 * Since typically expiration of replication timeout means
@@ -1806,6 +1985,9 @@ WalSndLoop(WalSndSendDataCallback send_data)
 	last_reply_timestamp = GetCurrentTimestamp();
 	waiting_for_ping_response = false;
 
+	/* Check if we are managing potential causal_reads standby. */
+	am_potential_causal_reads_standby = CausalReadsPotentialStandby();
+
 	/*
 	 * Loop until we reach the end of this timeline or the client requests to
 	 * stop streaming.
@@ -1963,6 +2145,7 @@ InitWalSenderSlot(void)
 			walsnd->pid = MyProcPid;
 			walsnd->sentPtr = InvalidXLogRecPtr;
 			walsnd->state = WALSNDSTATE_STARTUP;
+			walsnd->causal_reads_state = WALSNDCRSTATE_UNAVAILABLE;
 			walsnd->latch = &MyProc->procLatch;
 			SpinLockRelease(&walsnd->mutex);
 			/* don't need the lock anymore */
@@ -2732,6 +2915,24 @@ WalSndGetStateString(WalSndState state)
 	return "UNKNOWN";
 }
 
+/*
+ * Return a string constant representing the causal reads state. This is used
+ * in system views, and should *not* be translated.
+ */
+static const char *
+WalSndGetCausalReadsStateString(WalSndCausalReadsState causal_reads_state)
+{
+	switch (causal_reads_state)
+	{
+		case WALSNDCRSTATE_UNAVAILABLE:
+			return "unavailable";
+		case WALSNDCRSTATE_JOINING:
+			return "joining";
+		case WALSNDCRSTATE_AVAILABLE:
+			return "available";
+	}
+	return "UNKNOWN";
+}
 
 /*
  * Returns activity of walsenders, including pids and xlog locations sent to
@@ -2740,7 +2941,7 @@ WalSndGetStateString(WalSndState state)
 Datum
 pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_WAL_SENDERS_COLS	8
+#define PG_STAT_GET_WAL_SENDERS_COLS	10
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	TupleDesc	tupdesc;
 	Tuplestorestate *tupstore;
@@ -2788,8 +2989,10 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 		XLogRecPtr	write;
 		XLogRecPtr	flush;
 		XLogRecPtr	apply;
+		int			applyLagMs;
 		int			priority;
 		WalSndState state;
+		WalSndCausalReadsState causalReadsState;
 		Datum		values[PG_STAT_GET_WAL_SENDERS_COLS];
 		bool		nulls[PG_STAT_GET_WAL_SENDERS_COLS];
 
@@ -2799,9 +3002,11 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 		SpinLockAcquire(&walsnd->mutex);
 		sentPtr = walsnd->sentPtr;
 		state = walsnd->state;
+		causalReadsState = walsnd->causal_reads_state;
 		write = walsnd->write;
 		flush = walsnd->flush;
 		apply = walsnd->apply;
+		applyLagMs = walsnd->applyLagMs;
 		priority = walsnd->sync_standby_priority;
 		SpinLockRelease(&walsnd->mutex);
 
@@ -2833,6 +3038,23 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 				nulls[5] = true;
 			values[5] = LSNGetDatum(apply);
 
+			if (applyLagMs < 0)
+				nulls[6] = true;
+			else
+			{
+				Interval *applyLagInterval = palloc(sizeof(Interval));
+
+				applyLagInterval->month = 0;
+				applyLagInterval->day = 0;
+#ifdef HAVE_INT64_TIMESTAMP
+				applyLagInterval->time = applyLagMs * 1000;
+#else
+				applyLagInterval->time = applyLagMs / 1000.0;
+#endif
+				nulls[6] = false;
+				values[6] = IntervalPGetDatum(applyLagInterval);
+			}
+
 			/*
 			 * Treat a standby such as a pg_basebackup background process
 			 * which always returns an invalid flush location, as an
@@ -2840,18 +3062,21 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 			 */
 			priority = XLogRecPtrIsInvalid(walsnd->flush) ? 0 : priority;
 
-			values[6] = Int32GetDatum(priority);
+			values[7] = Int32GetDatum(priority);
 
 			/*
 			 * More easily understood version of standby state. This is purely
 			 * informational, not different from priority.
 			 */
 			if (priority == 0)
-				values[7] = CStringGetTextDatum("async");
+				values[8] = CStringGetTextDatum("async");
 			else if (walsnd == sync_standby)
-				values[7] = CStringGetTextDatum("sync");
+				values[8] = CStringGetTextDatum("sync");
 			else
-				values[7] = CStringGetTextDatum("potential");
+				values[8] = CStringGetTextDatum("potential");
+
+			values[9] =
+				CStringGetTextDatum(WalSndGetCausalReadsStateString(causalReadsState));
 		}
 
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
@@ -2871,14 +3096,52 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 static void
 WalSndKeepalive(bool requestReply)
 {
+	TimestampTz now;
+	TimestampTz causal_reads_lease;
+
 	elog(DEBUG2, "sending replication keepalive");
 
+	/*
+	 * If the walsender currently deems the standby to be available for causal
+	 * reads, then it grants a causal reads lease.  The lease authorizes the
+	 * standby to consider itself available for causal reads until a short
+	 * time in the future.  The primary promises to uphold the causal reads
+	 * guarantee until that time, by stalling commits until the the lease has
+	 * expired if necessary.
+	 */
+	now = GetCurrentTimestamp();
+	if (MyWalSnd->causal_reads_state < WALSNDCRSTATE_AVAILABLE)
+		causal_reads_lease = 0; /* Not available, no lease granted. */
+	else
+	{
+		/*
+		 * Since this timestamp is being sent to the standby where it will be
+		 * compared against a time generated by the standby's system clock, we
+		 * must consider clock skew.  First, we decide on a maximum tolerable
+		 * difference between system clocks.  If the primary's clock is ahead
+		 * of the standby's by more than this, then all bets are off (the
+		 * standby could falsely believe it has a valid lease).  If the
+		 * primary's clock is behind the standby's by more than this, then the
+		 * standby will err the other way and generate spurious errors in
+		 * causal_reads mode.  Rather than having a separate GUC for this, we
+		 * derive it from causal_reads_timeout.
+		 */
+		int max_clock_skew = causal_reads_timeout / CAUSAL_READS_CLOCK_SKEW_RATIO;
+
+		/* Compute and remember the expiry time of the lease we're granting. */
+		causal_reads_last_lease = TimestampTzPlusMilliseconds(now, causal_reads_timeout);
+		/* The version we'll send to the standby is adjusted to tolerate clock skew. */
+		causal_reads_lease =
+			TimestampTzPlusMilliseconds(causal_reads_last_lease, -max_clock_skew);
+	}
+
 	/* construct the message... */
 	resetStringInfo(&output_message);
 	pq_sendbyte(&output_message, 'k');
 	pq_sendint64(&output_message, sentPtr);
-	pq_sendint64(&output_message, GetCurrentIntegerTimestamp());
+	pq_sendint64(&output_message, TimestampTzToIntegerTimestamp(now));
 	pq_sendbyte(&output_message, requestReply ? 1 : 0);
+	pq_sendint64(&output_message, TimestampTzToIntegerTimestamp(causal_reads_lease));
 
 	/* ... and send it wrapped in CopyData */
 	pq_putmessage_noblock('d', output_message.data, output_message.len);
@@ -2896,23 +3159,32 @@ WalSndKeepaliveIfNecessary(TimestampTz now)
 	 * Don't send keepalive messages if timeouts are globally disabled or
 	 * we're doing something not partaking in timeouts.
 	 */
-	if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
+	if ((wal_sender_timeout <= 0 && causal_reads_timeout == 0) || last_reply_timestamp <= 0)
 		return;
 
-	if (waiting_for_ping_response)
+	if (waiting_for_ping_response && causal_reads_timeout == 0)
 		return;
 
 	/*
 	 * If half of wal_sender_timeout has lapsed without receiving any reply
 	 * from the standby, send a keep-alive message to the standby requesting
 	 * an immediate reply.
+	 *
+	 * If causal_reads_timeout has been configured, use it to control
+	 * keepalive intervals rather than wal_sender_timeout.
 	 */
-	ping_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
-											wal_sender_timeout / 2);
+	if (causal_reads_timeout != 0)
+		ping_time = TimestampTzPlusMilliseconds(last_keepalive_timestamp,
+												causal_reads_timeout /
+												CAUSAL_READS_KEEPALIVE_RATIO);
+	else
+		ping_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
+												wal_sender_timeout / 2);
 	if (now >= ping_time)
 	{
 		WalSndKeepalive(true);
 		waiting_for_ping_response = true;
+		last_keepalive_timestamp = now;
 
 		/* Try to flush pending output to the client */
 		if (pq_flush_if_writable() != 0)
diff --git a/src/backend/utils/adt/timestamp.c b/src/backend/utils/adt/timestamp.c
index 8fbb310..12c8b88 100644
--- a/src/backend/utils/adt/timestamp.c
+++ b/src/backend/utils/adt/timestamp.c
@@ -1611,6 +1611,20 @@ IntegerTimestampToTimestampTz(int64 timestamp)
 #endif
 
 /*
+ * TimestampTzToIntegerTimestamp -- convert a native timestamp to int64 format
+ *
+ * When compiled with --enable-integer-datetimes, this is implemented as a
+ * no-op macro.
+ */
+#ifndef HAVE_INT64_TIMESTAMP
+int64
+TimestampTzToIntegerTimestamp(TimestampTz timestamp)
+{
+	return timestamp * 1000000;
+}
+#endif
+
+/*
  * TimestampDifference -- convert the difference between two timestamps
  *		into integer seconds and microseconds
  *
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index a185749..6e0b144 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -351,6 +351,7 @@ static const struct config_enum_entry constraint_exclusion_options[] = {
 static const struct config_enum_entry synchronous_commit_options[] = {
 	{"local", SYNCHRONOUS_COMMIT_LOCAL_FLUSH, false},
 	{"remote_write", SYNCHRONOUS_COMMIT_REMOTE_WRITE, false},
+	{"apply", SYNCHRONOUS_COMMIT_REMOTE_APPLY, false},
 	{"on", SYNCHRONOUS_COMMIT_ON, false},
 	{"off", SYNCHRONOUS_COMMIT_OFF, false},
 	{"true", SYNCHRONOUS_COMMIT_ON, true},
@@ -1618,6 +1619,16 @@ static struct config_bool ConfigureNamesBool[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"causal_reads", PGC_USERSET, REPLICATION_STANDBY,
+		 gettext_noop("Enables causal reads."),
+		 NULL
+		},
+		&causal_reads,
+		false,
+		NULL, NULL, NULL
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL
@@ -1776,6 +1787,17 @@ static struct config_int ConfigureNamesInt[] =
 	},
 
 	{
+		{"causal_reads_timeout", PGC_SIGHUP, REPLICATION_STANDBY,
+			gettext_noop("Sets the maximum apply lag before causal reads standbys are no longer available."),
+			NULL,
+			GUC_UNIT_MS
+		},
+		&causal_reads_timeout,
+		0, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+
+	{
 		{"max_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
 			gettext_noop("Sets the maximum number of concurrent connections."),
 			NULL
@@ -3361,7 +3383,18 @@ static struct config_string ConfigureNamesString[] =
 		},
 		&SyncRepStandbyNames,
 		"",
-		check_synchronous_standby_names, NULL, NULL
+		check_standby_names, NULL, NULL
+	},
+
+	{
+		{"causal_reads_standby_names", PGC_SIGHUP, REPLICATION_MASTER,
+			gettext_noop("List of names of potential causal reads standbys."),
+			NULL,
+			GUC_LIST_INPUT
+		},
+		&causal_reads_standby_names,
+		"*",
+		check_standby_names, NULL, NULL
 	},
 
 	{
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index 074935c..a466732 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -46,8 +46,11 @@
 
 #include "access/transam.h"
 #include "access/xact.h"
+#include "access/xlog.h"
 #include "lib/pairingheap.h"
 #include "miscadmin.h"
+#include "replication/syncrep.h"
+#include "replication/walreceiver.h"
 #include "storage/predicate.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
@@ -209,6 +212,16 @@ GetTransactionSnapshot(void)
 				 "cannot take query snapshot during a parallel operation");
 
 		/*
+		 * In causal_reads mode on a standby, check if we have definitely
+		 * applied WAL for any COMMIT that returned successfully on the
+		 * primary.
+		 *
+		 * TODO: Machine readable error code?
+		 */
+		if (causal_reads && RecoveryInProgress() && !WalRcvCausalReadsAvailable())
+			elog(ERROR, "standby is not available for causal reads");
+
+		/*
 		 * In transaction-snapshot mode, the first snapshot must live until
 		 * end of xact regardless of what the caller does with it, so we must
 		 * make a copy of it rather than returning CurrentSnapshotData
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index cb1c2db..0f08ff5 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -60,7 +60,11 @@ typedef enum
 	SYNCHRONOUS_COMMIT_LOCAL_FLUSH,		/* wait for local flush only */
 	SYNCHRONOUS_COMMIT_REMOTE_WRITE,	/* wait for local flush and remote
 										 * write */
-	SYNCHRONOUS_COMMIT_REMOTE_FLUSH		/* wait for local and remote flush */
+	SYNCHRONOUS_COMMIT_REMOTE_FLUSH,	/* wait for local and remote flush */
+	SYNCHRONOUS_COMMIT_REMOTE_APPLY,	/* wait for local flush and remote
+										 * apply */
+	SYNCHRONOUS_COMMIT_CONSISTENT_APPLY /* wait for local flusha and remote
+										   apply with causal consistency */
 }	SyncCommitLevel;
 
 /* Define the default setting for synchonous_commit */
@@ -144,10 +148,13 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
  * EOXact... routines which run at the end of the original transaction
  * completion.
  */
+#define XACT_COMPLETION_SYNC_APPLY_FEEDBACK		(1U << 29)
 #define XACT_COMPLETION_UPDATE_RELCACHE_FILE	(1U << 30)
 #define XACT_COMPLETION_FORCE_SYNC_COMMIT		(1U << 31)
 
 /* Access macros for above flags */
+#define XactCompletionSyncApplyFeedback(xinfo) \
+	(!!(xinfo & XACT_COMPLETION_SYNC_APPLY_FEEDBACK))
 #define XactCompletionRelcacheInitFileInval(xinfo) \
 	(!!(xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE))
 #define XactCompletionForceSyncCommit(xinfo) \
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 790ca66..8aeda11 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -235,6 +235,9 @@ extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
 extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI);
 extern XLogRecPtr GetXLogInsertRecPtr(void);
 extern XLogRecPtr GetXLogWriteRecPtr(void);
+extern void SetXLogReplayTimestamp(TimestampTz timestamp);
+extern void SetXLogReplayTimestampAtLsn(TimestampTz timestamp, XLogRecPtr lsn);
+extern TimestampTz GetXLogReplayTimestamp(XLogRecPtr *lsn);
 extern bool RecoveryIsPaused(void);
 extern void SetRecoveryPause(bool recoveryPause);
 extern TimestampTz GetLatestXTime(void);
@@ -267,6 +270,8 @@ extern bool CheckPromoteSignal(void);
 extern void WakeupRecovery(void);
 extern void SetWalWriterSleeping(bool sleeping);
 
+extern void XLogRequestWalReceiverReply(void);
+
 extern void assign_max_wal_size(int newval, void *extra);
 extern void assign_checkpoint_completion_target(double newval, void *extra);
 
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index d8640db..acb6796 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -2783,7 +2783,7 @@ DATA(insert OID = 1936 (  pg_stat_get_backend_idset		PGNSP PGUID 12 1 100 0 0 f
 DESCR("statistics: currently active backend IDs");
 DATA(insert OID = 2022 (  pg_stat_get_activity			PGNSP PGUID 12 1 100 0 0 f f f f f t s r 1 0 2249 "23" "{23,26,23,26,25,25,25,16,1184,1184,1184,1184,869,25,23,28,28,16,25,25,23,16,25}" "{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}" "{pid,datid,pid,usesysid,application_name,state,query,waiting,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,ssl,sslversion,sslcipher,sslbits,sslcompression,sslclientdn}" _null_ _null_ pg_stat_get_activity _null_ _null_ _null_ ));
 DESCR("statistics: information about currently active backends");
-DATA(insert OID = 3099 (  pg_stat_get_wal_senders	PGNSP PGUID 12 1 10 0 0 f f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,23,25}" "{o,o,o,o,o,o,o,o}" "{pid,state,sent_location,write_location,flush_location,replay_location,sync_priority,sync_state}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
+DATA(insert OID = 3099 (  pg_stat_get_wal_senders	PGNSP PGUID 12 1 10 0 0 f f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,1186,23,25,25}" "{o,o,o,o,o,o,o,o,o,o}" "{pid,state,sent_location,write_location,flush_location,replay_location,replay_lag,sync_priority,sync_state,causal_reads_state}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
 DESCR("statistics: information about currently active replication");
 DATA(insert OID = 2026 (  pg_backend_pid				PGNSP PGUID 12 1 0 0 0 f f f f t f s r 0 0 23 "" _null_ _null_ _null_ _null_ _null_ pg_backend_pid _null_ _null_ _null_ ));
 DESCR("statistics: current backend PID");
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index 71e2857..6a090b7 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -23,14 +23,34 @@
 #define SYNC_REP_NO_WAIT		-1
 #define SYNC_REP_WAIT_WRITE		0
 #define SYNC_REP_WAIT_FLUSH		1
+#define SYNC_REP_WAIT_APPLY		2
+#define SYNC_REP_WAIT_CAUSAL_READS_APPLY 3
 
-#define NUM_SYNC_REP_WAIT_MODE	2
+#define NUM_SYNC_REP_WAIT_MODE	4
 
 /* syncRepState */
 #define SYNC_REP_NOT_WAITING		0
 #define SYNC_REP_WAITING			1
 #define SYNC_REP_WAIT_COMPLETE		2
 
+/*
+ * ratio of causal_read_timeout to max_clock_skew (4 means than the maximum
+ * tolerated clock difference between primary and standbys using causal_reads
+ * is 1/4 of causal_reads_timeout)
+ */
+#define CAUSAL_READS_CLOCK_SKEW_RATIO 4
+
+/*
+ * ratio of causal_reads_timeout to keepalive time (2 means that the effective
+ * keepalive time is 1/2 of the causal_reads_timeout GUC when it is non-zero)
+ */
+#define CAUSAL_READS_KEEPALIVE_RATIO 2
+
+/* GUC variables */
+extern int causal_reads_timeout;
+extern bool causal_reads;
+extern char *causal_reads_standby_names;
+
 /* user-settable parameters for synchronous replication */
 extern char *SyncRepStandbyNames;
 
@@ -42,16 +62,23 @@ extern void SyncRepCleanupAtProcExit(void);
 
 /* called by wal sender */
 extern void SyncRepInitConfig(void);
-extern void SyncRepReleaseWaiters(void);
+extern void SyncRepReleaseWaiters(bool walsender_cr_available_or_joining);
 
 /* called by checkpointer */
 extern void SyncRepUpdateSyncStandbysDefined(void);
 
+/* called by user backend (xact.c) */
+extern void CausalReadsWaitForLSN(XLogRecPtr XactCommitLSN);
+
+/* called by wal sender */
+extern void CausalReadsBeginStall(TimestampTz lease_expiry_time);
+extern bool CausalReadsPotentialStandby(void);
+
 /* forward declaration to avoid pulling in walsender_private.h */
 struct WalSnd;
 extern struct WalSnd *SyncRepGetSynchronousStandby(void);
 
-extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source);
+extern bool check_standby_names(char **newval, void **extra, GucSource source);
 extern void assign_synchronous_commit(int newval, void *extra);
 
 #endif   /* _SYNCREP_H */
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 61255a9..40d99e6 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -79,6 +79,13 @@ typedef struct
 	TimeLineID	receivedTLI;
 
 	/*
+	 * causallyReadsLease is the time until which the primary has authorized
+	 * this standby to consider itself available for causal_reads mode, or 0
+	 * for not authorized.
+	 */
+	TimestampTz causalReadsLease;
+
+	/*
 	 * latestChunkStart is the starting byte position of the current "batch"
 	 * of received WAL.  It's actually the same as the previous value of
 	 * receivedUpto before the last flush to disk.  Startup process can use
@@ -160,5 +167,8 @@ extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr,
 extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
 extern int	GetReplicationApplyDelay(void);
 extern int	GetReplicationTransferLatency(void);
+extern void WalRcvWakeup(void);
+
+extern bool WalRcvCausalReadsAvailable(void);
 
 #endif   /* _WALRECEIVER_H */
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 6dae480..88b4fe9 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -27,6 +27,13 @@ typedef enum WalSndState
 	WALSNDSTATE_STREAMING
 } WalSndState;
 
+typedef enum WalSndCausalReadsState
+{
+	WALSNDCRSTATE_UNAVAILABLE = 0,
+	WALSNDCRSTATE_JOINING,
+	WALSNDCRSTATE_AVAILABLE
+} WalSndCausalReadsState;
+
 /*
  * Each walsender has a WalSnd struct in shared memory.
  */
@@ -34,6 +41,7 @@ typedef struct WalSnd
 {
 	pid_t		pid;			/* this walsender's process id, or 0 */
 	WalSndState state;			/* this walsender's state */
+	WalSndCausalReadsState causal_reads_state; /* the walsender's causal reads state */
 	XLogRecPtr	sentPtr;		/* WAL has been sent up to this point */
 	bool		needreload;		/* does currently-open file need to be
 								 * reloaded? */
@@ -46,6 +54,7 @@ typedef struct WalSnd
 	XLogRecPtr	write;
 	XLogRecPtr	flush;
 	XLogRecPtr	apply;
+	int			applyLagMs;
 
 	/* Protects shared variables shown above. */
 	slock_t		mutex;
@@ -88,6 +97,12 @@ typedef struct
 	 */
 	bool		sync_standbys_defined;
 
+	/*
+	 * Until when must commits in causal_reads stall?  This is used to wait
+	 * for causal reads leases to expire.
+	 */
+	TimestampTz	stall_causal_reads_until;
+
 	WalSnd		walsnds[FLEXIBLE_ARRAY_MEMBER];
 } WalSndCtlData;
 
diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h
index 530fef1..0f4b166 100644
--- a/src/include/utils/timestamp.h
+++ b/src/include/utils/timestamp.h
@@ -227,9 +227,11 @@ extern bool TimestampDifferenceExceeds(TimestampTz start_time,
 #ifndef HAVE_INT64_TIMESTAMP
 extern int64 GetCurrentIntegerTimestamp(void);
 extern TimestampTz IntegerTimestampToTimestampTz(int64 timestamp);
+extern int64 TimestampTzToIntegerTimestamp(TimestampTz timestamp);
 #else
 #define GetCurrentIntegerTimestamp()	GetCurrentTimestamp()
 #define IntegerTimestampToTimestampTz(timestamp) (timestamp)
+#define TimestampTzToIntegerTimestamp(timestamp) (timestamp)
 #endif
 
 extern TimestampTz time_t_to_timestamptz(pg_time_t tm);
