From 719edc52e6298790a1d7ab0942e232ef9d9e9954 Mon Sep 17 00:00:00 2001
From: Robert Haas <rhaas@postgresql.org>
Date: Sat, 4 Nov 2017 19:03:03 +0100
Subject: [PATCH 2/2] shm-mq-reduce-receiver-latch-set-v3

---
 src/backend/storage/ipc/shm_mq.c | 70 +++++++++++++++++++++++++---------------
 1 file changed, 44 insertions(+), 26 deletions(-)

diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c
index f3ede48d9b..02a5df8da9 100644
--- a/src/backend/storage/ipc/shm_mq.c
+++ b/src/backend/storage/ipc/shm_mq.c
@@ -143,10 +143,11 @@ struct shm_mq_handle
 };
 
 static void shm_mq_detach_internal(shm_mq *mq);
-static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes,
+static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes,
 				  const void *data, bool nowait, Size *bytes_written);
-static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed,
-					 bool nowait, Size *nbytesp, void **datap);
+static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh,
+					 Size bytes_needed, bool nowait, Size *nbytesp,
+					 void **datap);
 static bool shm_mq_counterparty_gone(volatile shm_mq *mq,
 						 BackgroundWorkerHandle *handle);
 static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile *ptr,
@@ -586,8 +587,14 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 		mqh->mqh_counterparty_attached = true;
 	}
 
-	/* Consume any zero-copy data from previous receive operation. */
-	if (mqh->mqh_consume_pending > 0)
+	/*
+	 * If we've consumed an amount of data greater than 1/4th of the ring
+	 * size, mark it consumed in shared memory.  We try to avoid doing this
+	 * unnecessarily when only a small amount of data has been consumed,
+	 * because SetLatch() is fairly expensive and we don't want to do it too
+	 * often.
+	 */
+	if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
 	{
 		shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
 		mqh->mqh_consume_pending = 0;
@@ -598,7 +605,7 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 	{
 		/* Try to receive the message length word. */
 		Assert(mqh->mqh_partial_bytes < sizeof(Size));
-		res = shm_mq_receive_bytes(mq, sizeof(Size) - mqh->mqh_partial_bytes,
+		res = shm_mq_receive_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
 								   nowait, &rb, &rawdata);
 		if (res != SHM_MQ_SUCCESS)
 			return res;
@@ -618,13 +625,7 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 			needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
 			if (rb >= needed)
 			{
-				/*
-				 * Technically, we could consume the message length
-				 * information at this point, but the extra write to shared
-				 * memory wouldn't be free and in most cases we would reap no
-				 * benefit.
-				 */
-				mqh->mqh_consume_pending = needed;
+				mqh->mqh_consume_pending += needed;
 				*nbytesp = nbytes;
 				*datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
 				return SHM_MQ_SUCCESS;
@@ -636,7 +637,7 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 			 */
 			mqh->mqh_expected_bytes = nbytes;
 			mqh->mqh_length_word_complete = true;
-			shm_mq_inc_bytes_read(mq, MAXALIGN(sizeof(Size)));
+			mqh->mqh_consume_pending += MAXALIGN(sizeof(Size));
 			rb -= MAXALIGN(sizeof(Size));
 		}
 		else
@@ -655,7 +656,7 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 			}
 			Assert(mqh->mqh_buflen >= sizeof(Size));
 
-			/* Copy and consume partial length word. */
+			/* Copy partial length word; remember to consume it. */
 			if (mqh->mqh_partial_bytes + rb > sizeof(Size))
 				lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
 			else
@@ -663,7 +664,7 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 			memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
 				   lengthbytes);
 			mqh->mqh_partial_bytes += lengthbytes;
-			shm_mq_inc_bytes_read(mq, MAXALIGN(lengthbytes));
+			mqh->mqh_consume_pending += MAXALIGN(lengthbytes);
 			rb -= lengthbytes;
 
 			/* If we now have the whole word, we're ready to read payload. */
@@ -685,13 +686,13 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 		 * we need not copy the data and can return a pointer directly into
 		 * shared memory.
 		 */
-		res = shm_mq_receive_bytes(mq, nbytes, nowait, &rb, &rawdata);
+		res = shm_mq_receive_bytes(mqh, nbytes, nowait, &rb, &rawdata);
 		if (res != SHM_MQ_SUCCESS)
 			return res;
 		if (rb >= nbytes)
 		{
 			mqh->mqh_length_word_complete = false;
-			mqh->mqh_consume_pending = MAXALIGN(nbytes);
+			mqh->mqh_consume_pending += MAXALIGN(nbytes);
 			*nbytesp = nbytes;
 			*datap = rawdata;
 			return SHM_MQ_SUCCESS;
@@ -731,13 +732,13 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 		mqh->mqh_partial_bytes += rb;
 
 		/*
-		 * Update count of bytes read, with alignment padding.  Note that this
-		 * will never actually insert any padding except at the end of a
-		 * message, because the buffer size is a multiple of MAXIMUM_ALIGNOF,
-		 * and each read and write is as well.
+		 * Update count of bytes that can be consumed, accounting for
+		 * alignment padding.  Note that this will never actually insert any
+		 * padding except at the end of a message, because the buffer size is
+		 * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well.
 		 */
 		Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
-		shm_mq_inc_bytes_read(mq, MAXALIGN(rb));
+		mqh->mqh_consume_pending += MAXALIGN(rb);
 
 		/* If we got all the data, exit the loop. */
 		if (mqh->mqh_partial_bytes >= nbytes)
@@ -745,7 +746,7 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 
 		/* Wait for some more data. */
 		still_needed = nbytes - mqh->mqh_partial_bytes;
-		res = shm_mq_receive_bytes(mq, still_needed, nowait, &rb, &rawdata);
+		res = shm_mq_receive_bytes(mqh, still_needed, nowait, &rb, &rawdata);
 		if (res != SHM_MQ_SUCCESS)
 			return res;
 		if (rb > still_needed)
@@ -1012,9 +1013,10 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
  * is SHM_MQ_SUCCESS.
  */
 static shm_mq_result
-shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait,
+shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait,
 					 Size *nbytesp, void **datap)
 {
+	shm_mq	   *mq = mqh->mqh_queue;
 	Size		ringsize = mq->mq_ring_size;
 	uint64		used;
 	uint64		written;
@@ -1026,7 +1028,13 @@ shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait,
 
 		/* Get bytes written, so we can compute what's available to read. */
 		written = pg_atomic_read_u64(&mq->mq_bytes_written);
-		read = pg_atomic_read_u64(&mq->mq_bytes_read);
+
+		/*
+		 * Get bytes read.  Include bytes we could consume but have not yet
+		 * consumed.
+		 */
+		read = pg_atomic_read_u64(&mq->mq_bytes_read) +
+			mqh->mqh_consume_pending;
 		used = written - read;
 		Assert(used <= ringsize);
 		offset = read % (uint64) ringsize;
@@ -1057,6 +1065,16 @@ shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait,
 		if (mq->mq_detached)
 			return SHM_MQ_DETACHED;
 
+		/*
+		 * We didn't get enough data to satisfy the request, so mark any data
+		 * previously-consumed as read to make more buffer space.
+		 */
+		if (mqh->mqh_consume_pending > 0)
+		{
+			shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
+			mqh->mqh_consume_pending = 0;
+		}
+
 		/* Skip manipulation of our latch if nowait = true. */
 		if (nowait)
 			return SHM_MQ_WOULD_BLOCK;
-- 
2.14.3 (Apple Git-98)

