At Tue, 21 Jun 2022 14:56:40 +0900 (JST), Kyotaro Horiguchi <horikyota....@gmail.com> wrote in > By the way, I noticed that "libpq_pipeline uniqviol" intermittently > fails for uncertain reasons. > > > result 574/575: pipeline aborted > > ........................................................... > > done writing > > > > libpq_pipeline:1531: got unexpected NULL
PQsendQueryPrepared() is called after the conection's state has moved to PGASYNC_IDLE so PQgetResult returns NULL. But actually there are results. So, if pqPipelineProcessorQueue() doesn't move the async state to PGASYNC_IDLE when queue is emtpy, uniqviol can run till the end. But that change breaks almost all of other test items. Finally, I found that the change in pqPipelineProcessorQueue() as attached fixes the uniqviol failure and doesn't break other tests. However, I don't understand what I did by the change for now... X( It seems to me something's wrong in the PQ_PIPELINE_ABORTED mode.. regards. -- Kyotaro Horiguchi NTT Open Source Software Center
>From f0b69fbc4f708d3b844b672989dbf15f84ed0c9b Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Wed, 15 Jun 2022 19:56:41 +0200 Subject: [PATCH v6] Avoid going IDLE in pipeline mode Introduce a new PGASYNC_PIPELINE_IDLE state, which helps PQgetResult distinguish the case of "really idle". This fixes the problem that ReadyForQuery is sent too soon, which caused a CloseComplete to be received when in idle state. XXX -- this is still WIP. Co-authored-by: Kyotaro Horiguchi <horikyota....@gmail.com> Reported-by: Daniele Varrazzo <daniele.varra...@gmail.com> Discussion: https://postgr.es/m/ca+mi_8bvd0_cw3sumgwpvwdnzxy32itog_16tdyru_1s2gv...@mail.gmail.com --- src/interfaces/libpq/fe-connect.c | 1 + src/interfaces/libpq/fe-exec.c | 56 +++++++---- src/interfaces/libpq/fe-protocol3.c | 12 --- src/interfaces/libpq/libpq-int.h | 3 +- .../modules/libpq_pipeline/libpq_pipeline.c | 99 +++++++++++++++++++ .../traces/simple_pipeline.trace | 37 +++++++ 6 files changed, 175 insertions(+), 33 deletions(-) diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index 709ba15220..afd0bc809a 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -6751,6 +6751,7 @@ PQtransactionStatus(const PGconn *conn) { if (!conn || conn->status != CONNECTION_OK) return PQTRANS_UNKNOWN; + /* XXX what should we do here if status is PGASYNC_PIPELINE_IDLE? */ if (conn->asyncStatus != PGASYNC_IDLE) return PQTRANS_ACTIVE; return conn->xactStatus; diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index 4180683194..3cf59e45e1 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -1279,7 +1279,8 @@ pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry) * itself consume commands from the queue; if we're in any other * state, we don't have to do anything. */ - if (conn->asyncStatus == PGASYNC_IDLE) + if (conn->asyncStatus == PGASYNC_IDLE || + conn->asyncStatus == PGASYNC_PIPELINE_IDLE) { resetPQExpBuffer(&conn->errorMessage); pqPipelineProcessQueue(conn); @@ -1642,9 +1643,9 @@ PQsendQueryStart(PGconn *conn, bool newQuery) return false; } - /* Can't send while already busy, either, unless enqueuing for later */ - if (conn->asyncStatus != PGASYNC_IDLE && - conn->pipelineStatus == PQ_PIPELINE_OFF) + /* In non-pipeline mode, we can't send anything while already busy */ + if (conn->pipelineStatus == PQ_PIPELINE_OFF && + conn->asyncStatus != PGASYNC_IDLE) { appendPQExpBufferStr(&conn->errorMessage, libpq_gettext("another command is already in progress\n")); @@ -1667,11 +1668,13 @@ PQsendQueryStart(PGconn *conn, bool newQuery) switch (conn->asyncStatus) { case PGASYNC_IDLE: + case PGASYNC_PIPELINE_IDLE: case PGASYNC_READY: case PGASYNC_READY_MORE: case PGASYNC_BUSY: /* ok to queue */ break; + case PGASYNC_COPY_IN: case PGASYNC_COPY_OUT: case PGASYNC_COPY_BOTH: @@ -1881,6 +1884,7 @@ PQsetSingleRowMode(PGconn *conn) */ if (!conn) return 0; + /* XXX modify this? */ if (conn->asyncStatus != PGASYNC_BUSY) return 0; if (!conn->cmd_queue_head || @@ -2047,19 +2051,19 @@ PQgetResult(PGconn *conn) { case PGASYNC_IDLE: res = NULL; /* query is complete */ - if (conn->pipelineStatus != PQ_PIPELINE_OFF) - { - /* - * We're about to return the NULL that terminates the round of - * results from the current query; prepare to send the results - * of the next query when we're called next. Also, since this - * is the start of the results of the next query, clear any - * prior error message. - */ - resetPQExpBuffer(&conn->errorMessage); - pqPipelineProcessQueue(conn); - } break; + case PGASYNC_PIPELINE_IDLE: + Assert(conn->pipelineStatus != PQ_PIPELINE_OFF); + + res = NULL; /* query is complete */ + /* + * We're about to return the NULL that terminates the round of + * results from the current query; prepare to send the results + * of the next query when we're called next. + */ + pqPipelineProcessQueue(conn); + break; + case PGASYNC_READY: /* @@ -2080,7 +2084,7 @@ PQgetResult(PGconn *conn) * We're about to send the results of the current query. Set * us idle now, and ... */ - conn->asyncStatus = PGASYNC_IDLE; + conn->asyncStatus = PGASYNC_PIPELINE_IDLE; /* * ... in cases when we're sending a pipeline-sync result, @@ -3008,17 +3012,28 @@ pqPipelineProcessQueue(PGconn *conn) case PGASYNC_READY: case PGASYNC_READY_MORE: case PGASYNC_BUSY: + case PGASYNC_IDLE: /* client still has to process current query or results */ return; - case PGASYNC_IDLE: + case PGASYNC_PIPELINE_IDLE: /* next query please */ break; } /* Nothing to do if not in pipeline mode, or queue is empty */ - if (conn->pipelineStatus == PQ_PIPELINE_OFF || - conn->cmd_queue_head == NULL) + if (conn->pipelineStatus == PQ_PIPELINE_OFF) + { + conn->asyncStatus = PGASYNC_IDLE; return; + } + + if (conn->cmd_queue_head == NULL) + { + if (conn->pipelineStatus != PQ_PIPELINE_ABORTED) + conn->asyncStatus = PGASYNC_IDLE; + + return; + } /* Initialize async result-accumulation state */ pqClearAsyncResult(conn); @@ -3105,6 +3120,7 @@ PQpipelineSync(PGconn *conn) case PGASYNC_READY_MORE: case PGASYNC_BUSY: case PGASYNC_IDLE: + case PGASYNC_PIPELINE_IDLE: /* OK to send sync */ break; } diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index 9ab3bf1fcb..bab8926a63 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -158,18 +158,6 @@ pqParseInput3(PGconn *conn) if (conn->asyncStatus != PGASYNC_IDLE) return; - /* - * We're also notionally not-IDLE when in pipeline mode the state - * says "idle" (so we have completed receiving the results of one - * query from the server and dispatched them to the application) - * but another query is queued; yield back control to caller so - * that they can initiate processing of the next query in the - * queue. - */ - if (conn->pipelineStatus != PQ_PIPELINE_OFF && - conn->cmd_queue_head != NULL) - return; - /* * Unexpected message in IDLE state; need to recover somehow. * ERROR messages are handled using the notice processor; diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 334aea4b6e..44a65e41b7 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -224,7 +224,8 @@ typedef enum * query */ PGASYNC_COPY_IN, /* Copy In data transfer in progress */ PGASYNC_COPY_OUT, /* Copy Out data transfer in progress */ - PGASYNC_COPY_BOTH /* Copy In/Out data transfer in progress */ + PGASYNC_COPY_BOTH, /* Copy In/Out data transfer in progress */ + PGASYNC_PIPELINE_IDLE, /* Pipeline mode */ } PGAsyncStatusType; /* Target server type (decoded value of target_session_attrs) */ diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c index c27c4e0ada..2bdd4e308f 100644 --- a/src/test/modules/libpq_pipeline/libpq_pipeline.c +++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c @@ -968,15 +968,29 @@ test_prepared(PGconn *conn) fprintf(stderr, "ok\n"); } +/* Notice processor: print them, and count how many we got */ +static void +notice_processor(void *arg, const char *message) +{ + int *n_notices = (int *) arg; + + (*n_notices)++; + fprintf(stderr, "NOTICE %d: %s", *n_notices, message); +} + static void test_simple_pipeline(PGconn *conn) { PGresult *res = NULL; const char *dummy_params[1] = {"1"}; Oid dummy_param_oids[1] = {INT4OID}; + PQnoticeProcessor oldproc; + int n_notices = 0; fprintf(stderr, "simple pipeline... "); + oldproc = PQsetNoticeProcessor(conn, notice_processor, &n_notices); + /* * Enter pipeline mode and dispatch a set of operations, which we'll then * process the results of as they come in. @@ -1052,6 +1066,91 @@ test_simple_pipeline(PGconn *conn) if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF) pg_fatal("Exiting pipeline mode didn't seem to work"); + if (n_notices > 0) + pg_fatal("unexpected notice"); + + + /* Try the same thing with PQsendQuery */ + if (PQenterPipelineMode(conn) != 1) + pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn)); + + if (PQsendQuery(conn, "SELECT 1") != 1) + pg_fatal("failed to send query: %s", PQerrorMessage(conn)); + PQsendFlushRequest(conn); + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned null when there's a pipeline item: %s", + PQerrorMessage(conn)); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pg_fatal("Unexpected result code %s from first pipeline item", + PQresStatus(PQresultStatus(res))); + PQclear(res); + + res = PQgetResult(conn); + if (res != NULL) + pg_fatal("expected NULL result"); + + if (PQpipelineSync(conn) != 1) + pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn)); + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned null when there's a pipeline item: %s", + PQerrorMessage(conn)); + if (PQresultStatus(res) != PGRES_PIPELINE_SYNC) + pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s", + PQresStatus(PQresultStatus(res)), PQerrorMessage(conn)); + PQclear(res); + res = NULL; + + if (PQexitPipelineMode(conn) != 1) + pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s", + PQerrorMessage(conn)); + + /* + * Must not have got any notices here; note bug as described in + * https://postgr.es/m/ca+mi_8bvd0_cw3sumgwpvwdnzxy32itog_16tdyru_1s2gv...@mail.gmail.com + */ + if (n_notices > 0) + pg_fatal("got %d notice(s)", n_notices); + + PQsetNoticeProcessor(conn, oldproc, NULL); + + /* + * Send a second command when libpq is in "pipeline-idle" state. + */ + if (PQenterPipelineMode(conn) != 1) + pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn)); + if (PQsendQuery(conn, "SELECT 1") != 1) + pg_fatal("failed to send query: %s", PQerrorMessage(conn)); + PQsendFlushRequest(conn); + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned null when there's a pipeline item: %s", + PQerrorMessage(conn)); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pg_fatal("unexpected result code %s from first pipeline item", + PQresStatus(PQresultStatus(res))); + if (PQsendQuery(conn, "SELECT 2") != 1) + pg_fatal("failed to send query: %s", PQerrorMessage(conn)); + PQsendFlushRequest(conn); + /* read terminating null from first query */ + res = PQgetResult(conn); + if (res != NULL) + pg_fatal("did not receive terminating NULL"); + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned null when there's a pipeline item: %s", + PQerrorMessage(conn)); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pg_fatal("unexpected result code %s from first pipeline item", + PQresStatus(PQresultStatus(res))); + res = PQgetResult(conn); + if (res != NULL) + pg_fatal("did not receive terminating NULL"); + if (PQexitPipelineMode(conn) != 1) + pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s", + PQerrorMessage(conn)); + fprintf(stderr, "ok\n"); } diff --git a/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace b/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace index 5c94749bc1..e1c0cba07e 100644 --- a/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace +++ b/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace @@ -9,4 +9,41 @@ B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0 B 11 DataRow 1 1 '1' B 13 CommandComplete "SELECT 1" B 5 ReadyForQuery I +F 16 Parse "" "SELECT 1" 0 +F 12 Bind "" "" 0 0 0 +F 6 Describe P "" +F 9 Execute "" 0 +F 6 Close P "" +F 4 Flush +B 4 ParseComplete +B 4 BindComplete +B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0 +B 11 DataRow 1 1 '1' +B 13 CommandComplete "SELECT 1" +F 4 Sync +B 4 CloseComplete +B 5 ReadyForQuery I +F 16 Parse "" "SELECT 1" 0 +F 12 Bind "" "" 0 0 0 +F 6 Describe P "" +F 9 Execute "" 0 +F 6 Close P "" +F 4 Flush +B 4 ParseComplete +B 4 BindComplete +B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0 +B 11 DataRow 1 1 '1' +B 13 CommandComplete "SELECT 1" +F 16 Parse "" "SELECT 2" 0 +F 12 Bind "" "" 0 0 0 +F 6 Describe P "" +F 9 Execute "" 0 +F 6 Close P "" +F 4 Flush +B 4 CloseComplete +B 4 ParseComplete +B 4 BindComplete +B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0 +B 11 DataRow 1 1 '2' +B 13 CommandComplete "SELECT 1" F 4 Terminate -- 2.31.1