At Thu, 16 Jun 2022 10:34:22 +0900 (JST), Kyotaro Horiguchi
<[email protected]> wrote in
> PQgetResult() resets the state to IDLE while in pipeline mode.
>
> fe-exec.c:2171
>
> > if (conn->pipelineStatus != PQ_PIPELINE_OFF)
> > {
> > /*
> > * We're about to send the results of the
> > current query. Set
> > * us idle now, and ...
> > */
> > conn->asyncStatus = PGASYNC_IDLE;
>
> And actually that code let the connection state enter to IDLE before
> CloseComplete. In the test case I posted, the following happens.
>
> PQsendQuery(conn, "SELECT 1;");
> PQsendFlushRequest(conn);
> PQgetResult(conn); // state enters IDLE, reads down to
> <CommandComplete>
> PQgetResult(conn); // reads <CloseComplete comes>
> PQpipelineSync(conn); // sync too late
>
> Pipeline feature seems intending to allow PQgetResult called before
> PQpipelineSync. And also seems allowing to call QPpipelineSync() after
> PQgetResult().
>
> I haven't come up with a valid *fix* of this flow..
The attached is a crude patch to separate the state for PIPELINE-IDLE
from PGASYNC_IDLE. I haven't found a better way..
regards.
--
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c
index 0ff563f59a..261ccc3ed4 100644
--- a/src/test/modules/libpq_pipeline/libpq_pipeline.c
+++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c
@@ -968,15 +968,27 @@ test_prepared(PGconn *conn)
fprintf(stderr, "ok\n");
}
+static int n_notice;
+static void
+notice_processor(void *arg, const char *message)
+{
+ n_notice++;
+ fprintf(stderr, "%s", message);
+}
+
static void
test_simple_pipeline(PGconn *conn)
{
PGresult *res = NULL;
const char *dummy_params[1] = {"1"};
Oid dummy_param_oids[1] = {INT4OID};
-
+ PQnoticeProcessor oldproc;
+
fprintf(stderr, "simple pipeline... ");
+ n_notice = 0;
+ oldproc = PQsetNoticeProcessor(conn, notice_processor, NULL);
+
/*
* Enter pipeline mode and dispatch a set of operations, which we'll then
* process the results of as they come in.
@@ -1052,6 +1064,51 @@ test_simple_pipeline(PGconn *conn)
if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
pg_fatal("Exiting pipeline mode didn't seem to work");
+ if (n_notice > 0)
+ pg_fatal("unexpected notice");
+
+ /* Try the same thing with PQsendQuery */
+ if (PQenterPipelineMode(conn) != 1)
+ pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+
+ if (PQsendQuery(conn, "SELECT 1;") != 1)
+ pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+ PQsendFlushRequest(conn);
+
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+ PQerrorMessage(conn));
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("Unexpected result code %s from first pipeline item",
+ PQresStatus(PQresultStatus(res)));
+ PQclear(res);
+
+ res = PQgetResult(conn);
+ if (res != NULL)
+ pg_fatal("expected NULL result");
+
+ if (PQpipelineSync(conn) != 1)
+ pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+ PQerrorMessage(conn));
+ if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+ pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
+ PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+ PQclear(res);
+ res =NULL;
+
+ if (PQexitPipelineMode(conn) != 1)
+ pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+ PQerrorMessage(conn));
+
+ if (n_notice > 0)
+ pg_fatal("unexpected notice");
+
+ PQsetNoticeProcessor(conn, oldproc, NULL);
+
fprintf(stderr, "ok\n");
}
diff --git a/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace b/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace
index 5c94749bc1..1612c371c0 100644
--- a/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace
+++ b/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace
@@ -9,4 +9,18 @@ B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0
B 11 DataRow 1 1 '1'
B 13 CommandComplete "SELECT 1"
B 5 ReadyForQuery I
+F 17 Parse "" "SELECT 1;" 0
+F 12 Bind "" "" 0 0 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 6 Close P ""
+F 4 Flush
+B 4 ParseComplete
+B 4 BindComplete
+B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0
+B 11 DataRow 1 1 '1'
+B 13 CommandComplete "SELECT 1"
+F 4 Sync
+B 4 CloseComplete
+B 5 ReadyForQuery I
F 4 Terminate
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 919cf5741d..a8999911d7 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1380,7 +1380,7 @@ pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry)
* itself consume commands from the queue; if we're in any other
* state, we don't have to do anything.
*/
- if (conn->asyncStatus == PGASYNC_IDLE)
+ if (conn->asyncStatus == PGASYNC_PIPELINE_IDLE)
pqPipelineProcessQueue(conn);
break;
}
@@ -1778,6 +1778,10 @@ PQsendQueryStart(PGconn *conn, bool newQuery)
appendPQExpBufferStr(&conn->errorMessage,
libpq_gettext("cannot queue commands during COPY\n"));
return false;
+ case PGASYNC_PIPELINE_IDLE:
+ appendPQExpBufferStr(&conn->errorMessage,
+ libpq_gettext("unexpected PGASYNC_PIPELINE_IDLE state during COPY\n"));
+ return false;
}
}
else
@@ -2144,15 +2148,17 @@ PQgetResult(PGconn *conn)
{
case PGASYNC_IDLE:
res = NULL; /* query is complete */
- if (conn->pipelineStatus != PQ_PIPELINE_OFF)
- {
- /*
- * We're about to return the NULL that terminates the round of
- * results from the current query; prepare to send the results
- * of the next query when we're called next.
- */
- pqPipelineProcessQueue(conn);
- }
+ break;
+ case PGASYNC_PIPELINE_IDLE:
+ Assert (conn->pipelineStatus != PQ_PIPELINE_OFF);
+
+ res = NULL; /* query is complete */
+ /*
+ * We're about to return the NULL that terminates the round of
+ * results from the current query; prepare to send the results
+ * of the next query when we're called next.
+ */
+ pqPipelineProcessQueue(conn);
break;
case PGASYNC_READY:
@@ -2174,7 +2180,7 @@ PQgetResult(PGconn *conn)
* We're about to send the results of the current query. Set
* us idle now, and ...
*/
- conn->asyncStatus = PGASYNC_IDLE;
+ conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
/*
* ... in cases when we're sending a pipeline-sync result,
@@ -3090,9 +3096,10 @@ pqPipelineProcessQueue(PGconn *conn)
case PGASYNC_READY:
case PGASYNC_READY_MORE:
case PGASYNC_BUSY:
+ case PGASYNC_IDLE:
/* client still has to process current query or results */
return;
- case PGASYNC_IDLE:
+ case PGASYNC_PIPELINE_IDLE:
/* next query please */
break;
}
@@ -3100,7 +3107,10 @@ pqPipelineProcessQueue(PGconn *conn)
/* Nothing to do if not in pipeline mode, or queue is empty */
if (conn->pipelineStatus == PQ_PIPELINE_OFF ||
conn->cmd_queue_head == NULL)
+ {
+ conn->asyncStatus = PGASYNC_IDLE;
return;
+ }
/*
* Reset the error state. This and the next couple of steps correspond to
@@ -3193,6 +3203,7 @@ PQpipelineSync(PGconn *conn)
case PGASYNC_READY_MORE:
case PGASYNC_BUSY:
case PGASYNC_IDLE:
+ case PGASYNC_PIPELINE_IDLE:
/* OK to send sync */
break;
}
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 10c76daf6e..0d60e8c5c0 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -158,18 +158,6 @@ pqParseInput3(PGconn *conn)
if (conn->asyncStatus != PGASYNC_IDLE)
return;
- /*
- * We're also notionally not-IDLE when in pipeline mode the state
- * says "idle" (so we have completed receiving the results of one
- * query from the server and dispatched them to the application)
- * but another query is queued; yield back control to caller so
- * that they can initiate processing of the next query in the
- * queue.
- */
- if (conn->pipelineStatus != PQ_PIPELINE_OFF &&
- conn->cmd_queue_head != NULL)
- return;
-
/*
* Unexpected message in IDLE state; need to recover somehow.
* ERROR messages are handled using the notice processor;
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 3db6a17db4..7fcad1dd41 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -225,7 +225,8 @@ typedef enum
* query */
PGASYNC_COPY_IN, /* Copy In data transfer in progress */
PGASYNC_COPY_OUT, /* Copy Out data transfer in progress */
- PGASYNC_COPY_BOTH /* Copy In/Out data transfer in progress */
+ PGASYNC_COPY_BOTH, /* Copy In/Out data transfer in progress */
+ PGASYNC_PIPELINE_IDLE, /* */
} PGAsyncStatusType;
/* Target server type (decoded value of target_session_attrs) */