From 6427010b04e4dc2becb9d79be6cdf07804e74e34 Mon Sep 17 00:00:00 2001
From: Melih Mutlu <m.melihmutlu@gmail.com>
Date: Mon, 20 Nov 2023 11:20:52 +0300
Subject: [PATCH] Flush large data immediately in pqcomm

If the data is larger than send buffer size in pqcomm, we're sure that
the send buffer will be flushed at least once to fit the data depending
on how large the data is.

Instead of memcpy'ing and then flushing data larger than 8K, this patch
changes socket_putmessage logic to flush large data immediately without
unnecessarily copying it to the pqcom send buffer.
---
 src/backend/libpq/pqcomm.c | 48 ++++++++++++++++++++++++++++++++------
 1 file changed, 41 insertions(+), 7 deletions(-)

diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index 522584e597..b1c7221a18 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -146,6 +146,7 @@ static int	socket_putmessage(char msgtype, const char *s, size_t len);
 static void socket_putmessage_noblock(char msgtype, const char *s, size_t len);
 static int	internal_putbytes(const char *s, size_t len);
 static int	internal_flush(void);
+static int	internal_flush_buffer(const char *s, int *start, int *end);
 
 static int	Lock_AF_UNIX(const char *unixSocketDir, const char *unixSocketPath);
 static int	Setup_AF_UNIX(const char *sock_path);
@@ -1333,11 +1334,25 @@ socket_flush(void)
  */
 static int
 internal_flush(void)
+{
+	/* flush the pending output from send buffer. */
+	return internal_flush_buffer(PqSendBuffer, &PqSendStart, &PqSendPointer);
+}
+
+/* --------------------------------
+ *		internal_flush_buffer - flush the given buffer content
+ *
+ * Returns 0 if OK (meaning everything was sent, or operation would block
+ * and the socket is in non-blocking mode), or EOF if trouble.
+ * --------------------------------
+ */
+static int
+internal_flush_buffer(const char *s, int *start, int *end)
 {
 	static int	last_reported_send_errno = 0;
 
-	char	   *bufptr = PqSendBuffer + PqSendStart;
-	char	   *bufend = PqSendBuffer + PqSendPointer;
+	char	   *bufptr = (char*) s + *start;
+	char	   *bufend = (char*) s + *end;
 
 	while (bufptr < bufend)
 	{
@@ -1383,7 +1398,7 @@ internal_flush(void)
 			 * flag that'll cause the next CHECK_FOR_INTERRUPTS to terminate
 			 * the connection.
 			 */
-			PqSendStart = PqSendPointer = 0;
+			*start = *end = 0;
 			ClientConnectionLost = 1;
 			InterruptPending = 1;
 			return EOF;
@@ -1391,10 +1406,10 @@ internal_flush(void)
 
 		last_reported_send_errno = 0;	/* reset after any successful send */
 		bufptr += r;
-		PqSendStart += r;
+		*start += r;
 	}
 
-	PqSendStart = PqSendPointer = 0;
+	*start = *end = 0;
 	return 0;
 }
 
@@ -1470,6 +1485,7 @@ socket_putmessage(char msgtype, const char *s, size_t len)
 	if (PqCommBusy)
 		return 0;
 	PqCommBusy = true;
+	
 	if (internal_putbytes(&msgtype, 1))
 		goto fail;
 
@@ -1477,8 +1493,26 @@ socket_putmessage(char msgtype, const char *s, size_t len)
 	if (internal_putbytes((char *) &n32, 4))
 		goto fail;
 
-	if (internal_putbytes(s, len))
-		goto fail;
+	if (len >= PqSendBufferSize)
+	{
+		int start = 0;
+		int end = len;
+
+		socket_set_nonblocking(false);
+		/* send pending data first */
+		if (internal_flush())
+			goto fail;
+
+		/* send the large buffer without copying it into PqSendBuffer */
+		if (internal_flush_buffer(s, &start, &end))
+			goto fail;
+	}
+	else
+	{
+		if (internal_putbytes(s, len))
+			goto fail;
+	}
+
 	PqCommBusy = false;
 	return 0;
 
-- 
2.34.1

