Hi all,

One year and a half ago Heikki has reported that libpq could run on an
infinite loop in a couple of code paths should an OOM happen in
The code paths are:
- BIND messages for getRowDescriptions(), or 'T' messages, fixed per 7b96bf44
- Code paths handling some calls to PQmakeEmptyPGresult(), fixed per 414bef3
- Start message for COPY command, not fixed yet.
The two first problems have been addressed, not the last one. And as
the last thread became long and confusing here is a revival of the
topic on a new thread, with a fresh patch, and a fresh study of the
problem submitted for this CF because it would be nice to get things
fixed, or improved in some cases, but see below...

Here is a summary of what this patch does and improves in some cases:
1) COPY IN and COPY OUT, without the patch, those two remain stuck in
an infinite loop... With the patch an error "out of memory" is
reported to the caller.
I had a look as well at some community utilities, like pgloader and
pg_bulkload, though none of them are using getCopyStart(). Do other
people have ideas to things to look at? pgadmin for example?

Now in the case of COPY_BOTH. let's take a couple of examples:
2) pg_receivexlog:
In the case of pg_receivexlog, the patch allows libpq to throw
correctly an error back to the caller, then pg_receivexlog will try to
send START_REPLICATION again at its next loop:
pg_receivexlog: could not send replication command
"START_REPLICATION": out of memory
Without the patch pg_receivexlog would just ignore the error, and at
the next iteration of ParseInput() the message obtained from server
would be treated because the message cursor does not move on in this
3) pg_recvlogical
Similarly pg_recvlogical fails with the following error with the patch attached:
pg_recvlogical: could not send replication command "START_REPLICATION
SLOT "toto" LOGICAL 0/0": out of memory
And after the failure it loops back to the next attempt, and is able
to perform its streaming stuff. Without the patch, similarly to
pg_receivexlog, COPY_BOTH would take the code path of ParseInput()
once again and it would treat the message, ignoring any OOM problems
on the way.
4) libpqwalreceiver: upon noticing the OOM error with the patch
attached, a standby would fail after sending START_REPLICATION, log
the OOM error properly and attempt to restart later a new WAL receiver
process to try again. Without this patch, the WAL receiver would still
fail to start, and log the following, unhelpful message:
2016-03-01 14:07:00 JST [84058]: [22-1] db=,user=,app=,client= LOG:
invalid record length at 0/3000970
That's not really informational for the user :( Note though that in
this case the WAL receiver does not remain in an infinite loop, and
that it is able to restart properly because it fails with this
"invalid record length error", and then start streaming at its next
attempt when the WAL receiver is started again.

So, that's how things are standing. I still see that it would be a
gain in terms of failure visibility to log properly this OOM failure
in all cases. Now it is true that if there are some applications that
may expect libpq to *not* return an error and treat the COPY start
message at the next loop of ParseInput(), though by looking at what is
in-core things can be handled properly.

Thoughts? I have registered that in the CF app, and a patch is attached.
From df931f39efb9e7fd50108345ebeb53a098d0dc29 Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@otacoo.com>
Date: Sun, 11 Oct 2015 20:46:40 +0900
Subject: [PATCH] Fix OOM error handling in COPY protocol of libpq

An OOM occurring while all the data needed by process from server has been
received can result in an infinite loop when parsing the output message.
getCopyStart is switched to discard a a message read from server in case of
server and any subsequent ones when receiving data from server for
PGASYNC_COPY_OUT, and not wait for any additional data when input is expected
via PGASYNC_COPY_IN. In the case of PGASYNC_COPY_BOTH, both concepts apply.
 .../libpqwalreceiver/libpqwalreceiver.c            |  1 +
 src/interfaces/libpq/fe-exec.c                     | 12 ++++
 src/interfaces/libpq/fe-protocol3.c                | 71 +++++++++++++++++-----
 3 files changed, 69 insertions(+), 15 deletions(-)

diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index f670957..5e79b78 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -445,6 +445,7 @@ libpqrcv_PQexec(const char *query)
 		if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
 			PQresultStatus(lastResult) == PGRES_COPY_OUT ||
 			PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
+			PQresultStatus(lastResult) == PGRES_FATAL_ERROR ||
 			PQstatus(streamConn) == CONNECTION_BAD)
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 41937c0..c99f193 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1804,6 +1804,10 @@ getCopyResult(PGconn *conn, ExecStatusType copytype)
 		return pqPrepareAsyncResult(conn);
+	/* If error has occured, return a PGRES_FATAL_ERROR result */
+	if (conn->result && conn->result->resultStatus == PGRES_FATAL_ERROR)
+		return pqPrepareAsyncResult(conn);
 	/* If we have an async result for the COPY, return that */
 	if (conn->result && conn->result->resultStatus == copytype)
 		return pqPrepareAsyncResult(conn);
@@ -1994,6 +1998,9 @@ PQexecFinish(PGconn *conn)
 	 * We have to stop if we see copy in/out/both, however. We will resume
 	 * parsing after application performs the data transfer.
+	 * Stop if we are in copy mode and error has occurred, the pending results
+	 * will be discarded during next execution in PQexecStart.
+	 *
 	 * Also stop if the connection is lost (else we'll loop infinitely).
 	lastResult = NULL;
@@ -2023,6 +2030,11 @@ PQexecFinish(PGconn *conn)
 			result->resultStatus == PGRES_COPY_BOTH ||
 			conn->status == CONNECTION_BAD)
+		else if ((conn->asyncStatus == PGASYNC_COPY_IN ||
+				  conn->asyncStatus == PGASYNC_COPY_OUT  ||
+				  conn->asyncStatus == PGASYNC_COPY_BOTH) &&
+				 result->resultStatus == PGRES_FATAL_ERROR)
+			break;
 	return lastResult;
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 43898a4..21a1d9b 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -49,7 +49,9 @@ static int	getParamDescriptions(PGconn *conn, int msgLength);
 static int	getAnotherTuple(PGconn *conn, int msgLength);
 static int	getParameterStatus(PGconn *conn);
 static int	getNotify(PGconn *conn);
-static int	getCopyStart(PGconn *conn, ExecStatusType copytype);
+static int	getCopyStart(PGconn *conn,
+						 ExecStatusType copytype,
+						 int msgLength);
 static int	getReadyForQuery(PGconn *conn);
 static void reportErrorPosition(PQExpBuffer msg, const char *query,
 					int loc, int encoding);
@@ -372,22 +374,25 @@ pqParseInput3(PGconn *conn)
 				case 'G':		/* Start Copy In */
-					if (getCopyStart(conn, PGRES_COPY_IN))
+					if (getCopyStart(conn, PGRES_COPY_IN, msgLength))
+					/* getCopyStart() moves inStart itself */
 					conn->asyncStatus = PGASYNC_COPY_IN;
-					break;
+					continue;
 				case 'H':		/* Start Copy Out */
-					if (getCopyStart(conn, PGRES_COPY_OUT))
+					if (getCopyStart(conn, PGRES_COPY_OUT, msgLength))
+					/* getCopyStart() moves inStart itself */
 					conn->asyncStatus = PGASYNC_COPY_OUT;
 					conn->copy_already_done = 0;
-					break;
+					continue;
 				case 'W':		/* Start Copy Both */
-					if (getCopyStart(conn, PGRES_COPY_BOTH))
+					if (getCopyStart(conn, PGRES_COPY_BOTH, msgLength))
+					/* getCopyStart() moves inStart itself */
 					conn->asyncStatus = PGASYNC_COPY_BOTH;
 					conn->copy_already_done = 0;
-					break;
+					continue;
 				case 'd':		/* Copy Data */
@@ -1385,22 +1390,24 @@ getNotify(PGconn *conn)
  * parseInput already read the message type and length.
 static int
-getCopyStart(PGconn *conn, ExecStatusType copytype)
+getCopyStart(PGconn *conn, ExecStatusType copytype, int msgLength)
 	PGresult   *result;
 	int			nfields;
 	int			i;
+	const char *errmsg = NULL;
 	result = PQmakeEmptyPGresult(conn, copytype);
 	if (!result)
-		goto failure;
+		goto advance_and_error;
 	if (pqGetc(&conn->copy_is_binary, conn))
-		goto failure;
+		goto not_enough_data;
 	result->binary = conn->copy_is_binary;
 	/* the next two bytes are the number of fields	*/
-	if (pqGetInt(&(result->numAttributes), 2, conn))
-		goto failure;
+	if (pqGetInt(&result->numAttributes, 2, conn))
+		goto not_enough_data;
 	nfields = result->numAttributes;
 	/* allocate space for the attribute descriptors */
@@ -1409,7 +1416,7 @@ getCopyStart(PGconn *conn, ExecStatusType copytype)
 		result->attDescs = (PGresAttDesc *)
 			pqResultAlloc(result, nfields * sizeof(PGresAttDesc), TRUE);
 		if (!result->attDescs)
-			goto failure;
+			goto advance_and_error;
 		MemSet(result->attDescs, 0, nfields * sizeof(PGresAttDesc));
@@ -1418,7 +1425,7 @@ getCopyStart(PGconn *conn, ExecStatusType copytype)
 		int			format;
 		if (pqGetInt(&format, 2, conn))
-			goto failure;
+			goto not_enough_data;
 		 * Since pqGetInt treats 2-byte integers as unsigned, we need to
@@ -1430,11 +1437,45 @@ getCopyStart(PGconn *conn, ExecStatusType copytype)
 	/* Success! */
 	conn->result = result;
+	/*
+	 * Advance inStart to show that the copy related message has been
+	 * processed.
+	 */
+	conn->inStart = conn->inCursor;
 	return 0;
 	return EOF;
+	/* Discard unsaved result, if any */
+	if (result && result != conn->result)
+		PQclear(result);
+	/* Discard the failed message by pretending we read it */
+	conn->inStart += 5 + msgLength;
+	/*
+	 * Replace partially constructed result with an error result. First
+	 * discard the old result to try to win back some memory.
+	 */
+	pqClearAsyncResult(conn);
+	/*
+	 * If preceding code didn't provide an error message, assume "out of
+	 * memory" was meant.  The advantage of having this special case is that
+	 * freeing the old result first greatly improves the odds that gettext()
+	 * will succeed in providing a translation.
+	 */
+	if (!errmsg)
+		errmsg = libpq_gettext("out of memory");
+	printfPQExpBuffer(&conn->errorMessage, "%s\n", errmsg);
+	pqSaveErrorResult(conn);
+	return 0;

Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:

Reply via email to