At Thu, 16 Jun 2022 10:34:22 +0900 (JST), Kyotaro Horiguchi <horikyota....@gmail.com> wrote in > PQgetResult() resets the state to IDLE while in pipeline mode. > > fe-exec.c:2171 > > > if (conn->pipelineStatus != PQ_PIPELINE_OFF) > > { > > /* > > * We're about to send the results of the > > current query. Set > > * us idle now, and ... > > */ > > conn->asyncStatus = PGASYNC_IDLE; > > And actually that code let the connection state enter to IDLE before > CloseComplete. In the test case I posted, the following happens. > > PQsendQuery(conn, "SELECT 1;"); > PQsendFlushRequest(conn); > PQgetResult(conn); // state enters IDLE, reads down to > <CommandComplete> > PQgetResult(conn); // reads <CloseComplete comes> > PQpipelineSync(conn); // sync too late > > Pipeline feature seems intending to allow PQgetResult called before > PQpipelineSync. And also seems allowing to call QPpipelineSync() after > PQgetResult(). > > I haven't come up with a valid *fix* of this flow..
The attached is a crude patch to separate the state for PIPELINE-IDLE from PGASYNC_IDLE. I haven't found a better way.. regards. -- Kyotaro Horiguchi NTT Open Source Software Center
diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c index 0ff563f59a..261ccc3ed4 100644 --- a/src/test/modules/libpq_pipeline/libpq_pipeline.c +++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c @@ -968,15 +968,27 @@ test_prepared(PGconn *conn) fprintf(stderr, "ok\n"); } +static int n_notice; +static void +notice_processor(void *arg, const char *message) +{ + n_notice++; + fprintf(stderr, "%s", message); +} + static void test_simple_pipeline(PGconn *conn) { PGresult *res = NULL; const char *dummy_params[1] = {"1"}; Oid dummy_param_oids[1] = {INT4OID}; - + PQnoticeProcessor oldproc; + fprintf(stderr, "simple pipeline... "); + n_notice = 0; + oldproc = PQsetNoticeProcessor(conn, notice_processor, NULL); + /* * Enter pipeline mode and dispatch a set of operations, which we'll then * process the results of as they come in. @@ -1052,6 +1064,51 @@ test_simple_pipeline(PGconn *conn) if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF) pg_fatal("Exiting pipeline mode didn't seem to work"); + if (n_notice > 0) + pg_fatal("unexpected notice"); + + /* Try the same thing with PQsendQuery */ + if (PQenterPipelineMode(conn) != 1) + pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn)); + + if (PQsendQuery(conn, "SELECT 1;") != 1) + pg_fatal("failed to send query: %s", PQerrorMessage(conn)); + PQsendFlushRequest(conn); + + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned null when there's a pipeline item: %s", + PQerrorMessage(conn)); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pg_fatal("Unexpected result code %s from first pipeline item", + PQresStatus(PQresultStatus(res))); + PQclear(res); + + res = PQgetResult(conn); + if (res != NULL) + pg_fatal("expected NULL result"); + + if (PQpipelineSync(conn) != 1) + pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn)); + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned null when there's a pipeline item: %s", + PQerrorMessage(conn)); + if (PQresultStatus(res) != PGRES_PIPELINE_SYNC) + pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s", + PQresStatus(PQresultStatus(res)), PQerrorMessage(conn)); + PQclear(res); + res =NULL; + + if (PQexitPipelineMode(conn) != 1) + pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s", + PQerrorMessage(conn)); + + if (n_notice > 0) + pg_fatal("unexpected notice"); + + PQsetNoticeProcessor(conn, oldproc, NULL); + fprintf(stderr, "ok\n"); } diff --git a/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace b/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace index 5c94749bc1..1612c371c0 100644 --- a/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace +++ b/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace @@ -9,4 +9,18 @@ B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0 B 11 DataRow 1 1 '1' B 13 CommandComplete "SELECT 1" B 5 ReadyForQuery I +F 17 Parse "" "SELECT 1;" 0 +F 12 Bind "" "" 0 0 0 +F 6 Describe P "" +F 9 Execute "" 0 +F 6 Close P "" +F 4 Flush +B 4 ParseComplete +B 4 BindComplete +B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0 +B 11 DataRow 1 1 '1' +B 13 CommandComplete "SELECT 1" +F 4 Sync +B 4 CloseComplete +B 5 ReadyForQuery I F 4 Terminate
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index 919cf5741d..a8999911d7 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -1380,7 +1380,7 @@ pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry) * itself consume commands from the queue; if we're in any other * state, we don't have to do anything. */ - if (conn->asyncStatus == PGASYNC_IDLE) + if (conn->asyncStatus == PGASYNC_PIPELINE_IDLE) pqPipelineProcessQueue(conn); break; } @@ -1778,6 +1778,10 @@ PQsendQueryStart(PGconn *conn, bool newQuery) appendPQExpBufferStr(&conn->errorMessage, libpq_gettext("cannot queue commands during COPY\n")); return false; + case PGASYNC_PIPELINE_IDLE: + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext("unexpected PGASYNC_PIPELINE_IDLE state during COPY\n")); + return false; } } else @@ -2144,15 +2148,17 @@ PQgetResult(PGconn *conn) { case PGASYNC_IDLE: res = NULL; /* query is complete */ - if (conn->pipelineStatus != PQ_PIPELINE_OFF) - { - /* - * We're about to return the NULL that terminates the round of - * results from the current query; prepare to send the results - * of the next query when we're called next. - */ - pqPipelineProcessQueue(conn); - } + break; + case PGASYNC_PIPELINE_IDLE: + Assert (conn->pipelineStatus != PQ_PIPELINE_OFF); + + res = NULL; /* query is complete */ + /* + * We're about to return the NULL that terminates the round of + * results from the current query; prepare to send the results + * of the next query when we're called next. + */ + pqPipelineProcessQueue(conn); break; case PGASYNC_READY: @@ -2174,7 +2180,7 @@ PQgetResult(PGconn *conn) * We're about to send the results of the current query. Set * us idle now, and ... */ - conn->asyncStatus = PGASYNC_IDLE; + conn->asyncStatus = PGASYNC_PIPELINE_IDLE; /* * ... in cases when we're sending a pipeline-sync result, @@ -3090,9 +3096,10 @@ pqPipelineProcessQueue(PGconn *conn) case PGASYNC_READY: case PGASYNC_READY_MORE: case PGASYNC_BUSY: + case PGASYNC_IDLE: /* client still has to process current query or results */ return; - case PGASYNC_IDLE: + case PGASYNC_PIPELINE_IDLE: /* next query please */ break; } @@ -3100,7 +3107,10 @@ pqPipelineProcessQueue(PGconn *conn) /* Nothing to do if not in pipeline mode, or queue is empty */ if (conn->pipelineStatus == PQ_PIPELINE_OFF || conn->cmd_queue_head == NULL) + { + conn->asyncStatus = PGASYNC_IDLE; return; + } /* * Reset the error state. This and the next couple of steps correspond to @@ -3193,6 +3203,7 @@ PQpipelineSync(PGconn *conn) case PGASYNC_READY_MORE: case PGASYNC_BUSY: case PGASYNC_IDLE: + case PGASYNC_PIPELINE_IDLE: /* OK to send sync */ break; } diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index 10c76daf6e..0d60e8c5c0 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -158,18 +158,6 @@ pqParseInput3(PGconn *conn) if (conn->asyncStatus != PGASYNC_IDLE) return; - /* - * We're also notionally not-IDLE when in pipeline mode the state - * says "idle" (so we have completed receiving the results of one - * query from the server and dispatched them to the application) - * but another query is queued; yield back control to caller so - * that they can initiate processing of the next query in the - * queue. - */ - if (conn->pipelineStatus != PQ_PIPELINE_OFF && - conn->cmd_queue_head != NULL) - return; - /* * Unexpected message in IDLE state; need to recover somehow. * ERROR messages are handled using the notice processor; diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 3db6a17db4..7fcad1dd41 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -225,7 +225,8 @@ typedef enum * query */ PGASYNC_COPY_IN, /* Copy In data transfer in progress */ PGASYNC_COPY_OUT, /* Copy Out data transfer in progress */ - PGASYNC_COPY_BOTH /* Copy In/Out data transfer in progress */ + PGASYNC_COPY_BOTH, /* Copy In/Out data transfer in progress */ + PGASYNC_PIPELINE_IDLE, /* */ } PGAsyncStatusType; /* Target server type (decoded value of target_session_attrs) */