At Tue, 21 Jun 2022 14:56:40 +0900 (JST), Kyotaro Horiguchi 
<horikyota....@gmail.com> wrote in 
> By the way, I noticed that "libpq_pipeline uniqviol" intermittently
> fails for uncertain reasons.
> 
> > result 574/575: pipeline aborted
> > ...........................................................
> > done writing
> > 
> > libpq_pipeline:1531: got unexpected NULL

PQsendQueryPrepared() is called after the conection's state has moved
to PGASYNC_IDLE so PQgetResult returns NULL. But actually there are
results.  So, if pqPipelineProcessorQueue() doesn't move the async
state to PGASYNC_IDLE when queue is emtpy, uniqviol can run till the
end. But that change breaks almost all of other test items.

Finally, I found that the change in pqPipelineProcessorQueue() as
attached fixes the uniqviol failure and doesn't break other tests.
However, I don't understand what I did by the change for now... X(
It seems to me something's wrong in the PQ_PIPELINE_ABORTED mode..

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
>From f0b69fbc4f708d3b844b672989dbf15f84ed0c9b Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Wed, 15 Jun 2022 19:56:41 +0200
Subject: [PATCH v6] Avoid going IDLE in pipeline mode

Introduce a new PGASYNC_PIPELINE_IDLE state, which helps PQgetResult
distinguish the case of "really idle".

This fixes the problem that ReadyForQuery is sent too soon, which caused
a CloseComplete to be received when in idle state.

XXX -- this is still WIP.

Co-authored-by: Kyotaro Horiguchi <horikyota....@gmail.com>
Reported-by: Daniele Varrazzo <daniele.varra...@gmail.com>
Discussion: https://postgr.es/m/ca+mi_8bvd0_cw3sumgwpvwdnzxy32itog_16tdyru_1s2gv...@mail.gmail.com
---
 src/interfaces/libpq/fe-connect.c             |  1 +
 src/interfaces/libpq/fe-exec.c                | 56 +++++++----
 src/interfaces/libpq/fe-protocol3.c           | 12 ---
 src/interfaces/libpq/libpq-int.h              |  3 +-
 .../modules/libpq_pipeline/libpq_pipeline.c   | 99 +++++++++++++++++++
 .../traces/simple_pipeline.trace              | 37 +++++++
 6 files changed, 175 insertions(+), 33 deletions(-)

diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 709ba15220..afd0bc809a 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -6751,6 +6751,7 @@ PQtransactionStatus(const PGconn *conn)
 {
 	if (!conn || conn->status != CONNECTION_OK)
 		return PQTRANS_UNKNOWN;
+	/* XXX what should we do here if status is PGASYNC_PIPELINE_IDLE? */
 	if (conn->asyncStatus != PGASYNC_IDLE)
 		return PQTRANS_ACTIVE;
 	return conn->xactStatus;
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 4180683194..3cf59e45e1 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1279,7 +1279,8 @@ pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry)
 			 * itself consume commands from the queue; if we're in any other
 			 * state, we don't have to do anything.
 			 */
-			if (conn->asyncStatus == PGASYNC_IDLE)
+			if (conn->asyncStatus == PGASYNC_IDLE ||
+				conn->asyncStatus == PGASYNC_PIPELINE_IDLE)
 			{
 				resetPQExpBuffer(&conn->errorMessage);
 				pqPipelineProcessQueue(conn);
@@ -1642,9 +1643,9 @@ PQsendQueryStart(PGconn *conn, bool newQuery)
 		return false;
 	}
 
-	/* Can't send while already busy, either, unless enqueuing for later */
-	if (conn->asyncStatus != PGASYNC_IDLE &&
-		conn->pipelineStatus == PQ_PIPELINE_OFF)
+	/* In non-pipeline mode, we can't send anything while already busy */
+	if (conn->pipelineStatus == PQ_PIPELINE_OFF &&
+		conn->asyncStatus != PGASYNC_IDLE)
 	{
 		appendPQExpBufferStr(&conn->errorMessage,
 							 libpq_gettext("another command is already in progress\n"));
@@ -1667,11 +1668,13 @@ PQsendQueryStart(PGconn *conn, bool newQuery)
 		switch (conn->asyncStatus)
 		{
 			case PGASYNC_IDLE:
+			case PGASYNC_PIPELINE_IDLE:
 			case PGASYNC_READY:
 			case PGASYNC_READY_MORE:
 			case PGASYNC_BUSY:
 				/* ok to queue */
 				break;
+
 			case PGASYNC_COPY_IN:
 			case PGASYNC_COPY_OUT:
 			case PGASYNC_COPY_BOTH:
@@ -1881,6 +1884,7 @@ PQsetSingleRowMode(PGconn *conn)
 	 */
 	if (!conn)
 		return 0;
+	/* XXX modify this? */
 	if (conn->asyncStatus != PGASYNC_BUSY)
 		return 0;
 	if (!conn->cmd_queue_head ||
@@ -2047,19 +2051,19 @@ PQgetResult(PGconn *conn)
 	{
 		case PGASYNC_IDLE:
 			res = NULL;			/* query is complete */
-			if (conn->pipelineStatus != PQ_PIPELINE_OFF)
-			{
-				/*
-				 * We're about to return the NULL that terminates the round of
-				 * results from the current query; prepare to send the results
-				 * of the next query when we're called next.  Also, since this
-				 * is the start of the results of the next query, clear any
-				 * prior error message.
-				 */
-				resetPQExpBuffer(&conn->errorMessage);
-				pqPipelineProcessQueue(conn);
-			}
 			break;
+		case PGASYNC_PIPELINE_IDLE:
+			Assert(conn->pipelineStatus != PQ_PIPELINE_OFF);
+
+			res = NULL;			/* query is complete */
+			/*
+			 * We're about to return the NULL that terminates the round of
+			 * results from the current query; prepare to send the results
+			 * of the next query when we're called next.
+			 */
+			pqPipelineProcessQueue(conn);
+			break;
+
 		case PGASYNC_READY:
 
 			/*
@@ -2080,7 +2084,7 @@ PQgetResult(PGconn *conn)
 				 * We're about to send the results of the current query.  Set
 				 * us idle now, and ...
 				 */
-				conn->asyncStatus = PGASYNC_IDLE;
+				conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
 
 				/*
 				 * ... in cases when we're sending a pipeline-sync result,
@@ -3008,17 +3012,28 @@ pqPipelineProcessQueue(PGconn *conn)
 		case PGASYNC_READY:
 		case PGASYNC_READY_MORE:
 		case PGASYNC_BUSY:
+		case PGASYNC_IDLE:
 			/* client still has to process current query or results */
 			return;
-		case PGASYNC_IDLE:
+		case PGASYNC_PIPELINE_IDLE:
 			/* next query please */
 			break;
 	}
 
 	/* Nothing to do if not in pipeline mode, or queue is empty */
-	if (conn->pipelineStatus == PQ_PIPELINE_OFF ||
-		conn->cmd_queue_head == NULL)
+	if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+	{
+		conn->asyncStatus = PGASYNC_IDLE;
 		return;
+	}
+
+	if (conn->cmd_queue_head == NULL)
+	{
+		if (conn->pipelineStatus != PQ_PIPELINE_ABORTED)
+			conn->asyncStatus = PGASYNC_IDLE;
+			
+		return;
+	}
 
 	/* Initialize async result-accumulation state */
 	pqClearAsyncResult(conn);
@@ -3105,6 +3120,7 @@ PQpipelineSync(PGconn *conn)
 		case PGASYNC_READY_MORE:
 		case PGASYNC_BUSY:
 		case PGASYNC_IDLE:
+		case PGASYNC_PIPELINE_IDLE:
 			/* OK to send sync */
 			break;
 	}
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 9ab3bf1fcb..bab8926a63 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -158,18 +158,6 @@ pqParseInput3(PGconn *conn)
 			if (conn->asyncStatus != PGASYNC_IDLE)
 				return;
 
-			/*
-			 * We're also notionally not-IDLE when in pipeline mode the state
-			 * says "idle" (so we have completed receiving the results of one
-			 * query from the server and dispatched them to the application)
-			 * but another query is queued; yield back control to caller so
-			 * that they can initiate processing of the next query in the
-			 * queue.
-			 */
-			if (conn->pipelineStatus != PQ_PIPELINE_OFF &&
-				conn->cmd_queue_head != NULL)
-				return;
-
 			/*
 			 * Unexpected message in IDLE state; need to recover somehow.
 			 * ERROR messages are handled using the notice processor;
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 334aea4b6e..44a65e41b7 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -224,7 +224,8 @@ typedef enum
 								 * query */
 	PGASYNC_COPY_IN,			/* Copy In data transfer in progress */
 	PGASYNC_COPY_OUT,			/* Copy Out data transfer in progress */
-	PGASYNC_COPY_BOTH			/* Copy In/Out data transfer in progress */
+	PGASYNC_COPY_BOTH,			/* Copy In/Out data transfer in progress */
+	PGASYNC_PIPELINE_IDLE,		/* Pipeline mode */
 } PGAsyncStatusType;
 
 /* Target server type (decoded value of target_session_attrs) */
diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c
index c27c4e0ada..2bdd4e308f 100644
--- a/src/test/modules/libpq_pipeline/libpq_pipeline.c
+++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c
@@ -968,15 +968,29 @@ test_prepared(PGconn *conn)
 	fprintf(stderr, "ok\n");
 }
 
+/* Notice processor: print them, and count how many we got */
+static void
+notice_processor(void *arg, const char *message)
+{
+	int	   *n_notices = (int *) arg;
+
+	(*n_notices)++;
+	fprintf(stderr, "NOTICE %d: %s", *n_notices, message);
+}
+
 static void
 test_simple_pipeline(PGconn *conn)
 {
 	PGresult   *res = NULL;
 	const char *dummy_params[1] = {"1"};
 	Oid			dummy_param_oids[1] = {INT4OID};
+	PQnoticeProcessor oldproc;
+	int			n_notices = 0;
 
 	fprintf(stderr, "simple pipeline... ");
 
+	oldproc = PQsetNoticeProcessor(conn, notice_processor, &n_notices);
+
 	/*
 	 * Enter pipeline mode and dispatch a set of operations, which we'll then
 	 * process the results of as they come in.
@@ -1052,6 +1066,91 @@ test_simple_pipeline(PGconn *conn)
 	if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
 		pg_fatal("Exiting pipeline mode didn't seem to work");
 
+	if (n_notices > 0)
+		pg_fatal("unexpected notice");
+
+
+	/* Try the same thing with PQsendQuery */
+	if (PQenterPipelineMode(conn) != 1)
+		pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+
+	if (PQsendQuery(conn, "SELECT 1") != 1)
+		pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+	PQsendFlushRequest(conn);
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("Unexpected result code %s from first pipeline item",
+				 PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("expected NULL result");
+
+	if (PQpipelineSync(conn) != 1)
+		pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+		pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
+				 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+	PQclear(res);
+	res = NULL;
+
+	if (PQexitPipelineMode(conn) != 1)
+		pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+				 PQerrorMessage(conn));
+
+	/*
+	 * Must not have got any notices here; note bug as described in
+	 * https://postgr.es/m/ca+mi_8bvd0_cw3sumgwpvwdnzxy32itog_16tdyru_1s2gv...@mail.gmail.com
+	 */
+	if (n_notices > 0)
+		pg_fatal("got %d notice(s)", n_notices);
+
+	PQsetNoticeProcessor(conn, oldproc, NULL);
+
+	/*
+	 * Send a second command when libpq is in "pipeline-idle" state.
+	 */
+	if (PQenterPipelineMode(conn) != 1)
+		pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+	if (PQsendQuery(conn, "SELECT 1") != 1)
+		pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+	PQsendFlushRequest(conn);
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("unexpected result code %s from first pipeline item",
+				 PQresStatus(PQresultStatus(res)));
+	if (PQsendQuery(conn, "SELECT 2") != 1)
+		pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+	PQsendFlushRequest(conn);
+	/* read terminating null from first query */
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("did not receive terminating NULL");
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("unexpected result code %s from first pipeline item",
+				 PQresStatus(PQresultStatus(res)));
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("did not receive terminating NULL");
+	if (PQexitPipelineMode(conn) != 1)
+		pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+				 PQerrorMessage(conn));
+
 	fprintf(stderr, "ok\n");
 }
 
diff --git a/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace b/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace
index 5c94749bc1..e1c0cba07e 100644
--- a/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace
+++ b/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace
@@ -9,4 +9,41 @@ B	33	RowDescription	 1 "?column?" NNNN 0 NNNN 4 -1 0
 B	11	DataRow	 1 1 '1'
 B	13	CommandComplete	 "SELECT 1"
 B	5	ReadyForQuery	 I
+F	16	Parse	 "" "SELECT 1" 0
+F	12	Bind	 "" "" 0 0 0
+F	6	Describe	 P ""
+F	9	Execute	 "" 0
+F	6	Close	 P ""
+F	4	Flush
+B	4	ParseComplete
+B	4	BindComplete
+B	33	RowDescription	 1 "?column?" NNNN 0 NNNN 4 -1 0
+B	11	DataRow	 1 1 '1'
+B	13	CommandComplete	 "SELECT 1"
+F	4	Sync
+B	4	CloseComplete
+B	5	ReadyForQuery	 I
+F	16	Parse	 "" "SELECT 1" 0
+F	12	Bind	 "" "" 0 0 0
+F	6	Describe	 P ""
+F	9	Execute	 "" 0
+F	6	Close	 P ""
+F	4	Flush
+B	4	ParseComplete
+B	4	BindComplete
+B	33	RowDescription	 1 "?column?" NNNN 0 NNNN 4 -1 0
+B	11	DataRow	 1 1 '1'
+B	13	CommandComplete	 "SELECT 1"
+F	16	Parse	 "" "SELECT 2" 0
+F	12	Bind	 "" "" 0 0 0
+F	6	Describe	 P ""
+F	9	Execute	 "" 0
+F	6	Close	 P ""
+F	4	Flush
+B	4	CloseComplete
+B	4	ParseComplete
+B	4	BindComplete
+B	33	RowDescription	 1 "?column?" NNNN 0 NNNN 4 -1 0
+B	11	DataRow	 1 1 '2'
+B	13	CommandComplete	 "SELECT 1"
 F	4	Terminate
-- 
2.31.1

Reply via email to