On 2022-Jun-16, Kyotaro Horiguchi wrote: > The attached is a crude patch to separate the state for PIPELINE-IDLE > from PGASYNC_IDLE. I haven't found a better way..
Ah, yeah, this might be a way to fix this. Something very similar to a PIPELINE_IDLE mode was present in Craig's initial patch for pipeline mode. However, I fought very hard to remove it, because it seemed to me that failing to handle it correctly everywhere would lead to more bugs than not having it. (Indeed, there were some.) However, I see now that your patch would not only fix this bug, but also let us remove the ugly "notionally not-idle" bit in fe-protocol3.c, which makes me ecstatic. So let's push forward with this. However, this means we'll have to go over all places that use asyncStatus to ensure that they all handle the new value correctly. I did found one bug in your patch: in the switch for asyncStatus in PQsendQueryStart, you introduce a new error message. With the current tests, that never fires, which is telling us that our coverage is not complete. But with the right sequence of libpq calls, which the attached adds (note that it's for REL_14_STABLE), that can be hit, and it's easy to see that throwing an error there is a mistake. The right action to take there is to let the action through. Others to think about: PQisBusy (I think no changes are needed), PQfn (I think it should accept a call in PGASYNC_PIPELINE_IDLE mode; fully untested in pipeline mode), PQexitPipelineMode (I think it needs to return error; needs test case), PQsendFlushRequest (I think it should let through; ditto). I also attach a patch to make the test suite use Test::Differences, if available. It makes the diffs of the traces much easier to read, when they fail. (I wish for a simple way to set the context size, but that would need a shim routine that I'm currently too lazy to write.) -- Álvaro Herrera 48°01'N 7°57'E — https://www.EnterpriseDB.com/
>From f96058efc70c3f72bc910308a9c2f64e4d3c7d8e Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Wed, 15 Jun 2022 19:56:41 +0200 Subject: [PATCH v5 1/2] Avoid going IDLE in pipeline mode Introduce a new PGASYNC_PIPELINE_IDLE state, which helps PQgetResult distinguish the case of "really idle". This fixes the problem that ReadyForQuery is sent too soon, which caused a CloseComplete to be received when in idle state. XXX -- this is still WIP. Co-authored-by: Kyotaro Horiguchi <horikyota....@gmail.com> Reported-by: Daniele Varrazzo <daniele.varra...@gmail.com> Discussion: https://postgr.es/m/ca+mi_8bvd0_cw3sumgwpvwdnzxy32itog_16tdyru_1s2gv...@mail.gmail.com --- src/interfaces/libpq/fe-connect.c | 1 + src/interfaces/libpq/fe-exec.c | 45 +++++---- src/interfaces/libpq/fe-protocol3.c | 12 --- src/interfaces/libpq/libpq-int.h | 3 +- .../modules/libpq_pipeline/libpq_pipeline.c | 99 +++++++++++++++++++ .../traces/simple_pipeline.trace | 37 +++++++ 6 files changed, 166 insertions(+), 31 deletions(-) diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index 709ba15220..afd0bc809a 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -6751,6 +6751,7 @@ PQtransactionStatus(const PGconn *conn) { if (!conn || conn->status != CONNECTION_OK) return PQTRANS_UNKNOWN; + /* XXX what should we do here if status is PGASYNC_PIPELINE_IDLE? */ if (conn->asyncStatus != PGASYNC_IDLE) return PQTRANS_ACTIVE; return conn->xactStatus; diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index 4180683194..59f2e7f724 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -1279,7 +1279,8 @@ pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry) * itself consume commands from the queue; if we're in any other * state, we don't have to do anything. */ - if (conn->asyncStatus == PGASYNC_IDLE) + if (conn->asyncStatus == PGASYNC_IDLE || + conn->asyncStatus == PGASYNC_PIPELINE_IDLE) { resetPQExpBuffer(&conn->errorMessage); pqPipelineProcessQueue(conn); @@ -1642,9 +1643,9 @@ PQsendQueryStart(PGconn *conn, bool newQuery) return false; } - /* Can't send while already busy, either, unless enqueuing for later */ - if (conn->asyncStatus != PGASYNC_IDLE && - conn->pipelineStatus == PQ_PIPELINE_OFF) + /* In non-pipeline mode, we can't send anything while already busy */ + if (conn->pipelineStatus == PQ_PIPELINE_OFF && + conn->asyncStatus != PGASYNC_IDLE) { appendPQExpBufferStr(&conn->errorMessage, libpq_gettext("another command is already in progress\n")); @@ -1667,11 +1668,13 @@ PQsendQueryStart(PGconn *conn, bool newQuery) switch (conn->asyncStatus) { case PGASYNC_IDLE: + case PGASYNC_PIPELINE_IDLE: case PGASYNC_READY: case PGASYNC_READY_MORE: case PGASYNC_BUSY: /* ok to queue */ break; + case PGASYNC_COPY_IN: case PGASYNC_COPY_OUT: case PGASYNC_COPY_BOTH: @@ -1881,6 +1884,7 @@ PQsetSingleRowMode(PGconn *conn) */ if (!conn) return 0; + /* XXX modify this? */ if (conn->asyncStatus != PGASYNC_BUSY) return 0; if (!conn->cmd_queue_head || @@ -2047,19 +2051,19 @@ PQgetResult(PGconn *conn) { case PGASYNC_IDLE: res = NULL; /* query is complete */ - if (conn->pipelineStatus != PQ_PIPELINE_OFF) - { - /* - * We're about to return the NULL that terminates the round of - * results from the current query; prepare to send the results - * of the next query when we're called next. Also, since this - * is the start of the results of the next query, clear any - * prior error message. - */ - resetPQExpBuffer(&conn->errorMessage); - pqPipelineProcessQueue(conn); - } break; + case PGASYNC_PIPELINE_IDLE: + Assert(conn->pipelineStatus != PQ_PIPELINE_OFF); + + res = NULL; /* query is complete */ + /* + * We're about to return the NULL that terminates the round of + * results from the current query; prepare to send the results + * of the next query when we're called next. + */ + pqPipelineProcessQueue(conn); + break; + case PGASYNC_READY: /* @@ -2080,7 +2084,7 @@ PQgetResult(PGconn *conn) * We're about to send the results of the current query. Set * us idle now, and ... */ - conn->asyncStatus = PGASYNC_IDLE; + conn->asyncStatus = PGASYNC_PIPELINE_IDLE; /* * ... in cases when we're sending a pipeline-sync result, @@ -3008,9 +3012,10 @@ pqPipelineProcessQueue(PGconn *conn) case PGASYNC_READY: case PGASYNC_READY_MORE: case PGASYNC_BUSY: + case PGASYNC_IDLE: /* client still has to process current query or results */ return; - case PGASYNC_IDLE: + case PGASYNC_PIPELINE_IDLE: /* next query please */ break; } @@ -3018,7 +3023,10 @@ pqPipelineProcessQueue(PGconn *conn) /* Nothing to do if not in pipeline mode, or queue is empty */ if (conn->pipelineStatus == PQ_PIPELINE_OFF || conn->cmd_queue_head == NULL) + { + conn->asyncStatus = PGASYNC_IDLE; return; + } /* Initialize async result-accumulation state */ pqClearAsyncResult(conn); @@ -3105,6 +3113,7 @@ PQpipelineSync(PGconn *conn) case PGASYNC_READY_MORE: case PGASYNC_BUSY: case PGASYNC_IDLE: + case PGASYNC_PIPELINE_IDLE: /* OK to send sync */ break; } diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index 9ab3bf1fcb..bab8926a63 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -158,18 +158,6 @@ pqParseInput3(PGconn *conn) if (conn->asyncStatus != PGASYNC_IDLE) return; - /* - * We're also notionally not-IDLE when in pipeline mode the state - * says "idle" (so we have completed receiving the results of one - * query from the server and dispatched them to the application) - * but another query is queued; yield back control to caller so - * that they can initiate processing of the next query in the - * queue. - */ - if (conn->pipelineStatus != PQ_PIPELINE_OFF && - conn->cmd_queue_head != NULL) - return; - /* * Unexpected message in IDLE state; need to recover somehow. * ERROR messages are handled using the notice processor; diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 334aea4b6e..44a65e41b7 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -224,7 +224,8 @@ typedef enum * query */ PGASYNC_COPY_IN, /* Copy In data transfer in progress */ PGASYNC_COPY_OUT, /* Copy Out data transfer in progress */ - PGASYNC_COPY_BOTH /* Copy In/Out data transfer in progress */ + PGASYNC_COPY_BOTH, /* Copy In/Out data transfer in progress */ + PGASYNC_PIPELINE_IDLE, /* Pipeline mode */ } PGAsyncStatusType; /* Target server type (decoded value of target_session_attrs) */ diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c index c27c4e0ada..2bdd4e308f 100644 --- a/src/test/modules/libpq_pipeline/libpq_pipeline.c +++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c @@ -968,15 +968,29 @@ test_prepared(PGconn *conn) fprintf(stderr, "ok\n"); } +/* Notice processor: print them, and count how many we got */ +static void +notice_processor(void *arg, const char *message) +{ + int *n_notices = (int *) arg; + + (*n_notices)++; + fprintf(stderr, "NOTICE %d: %s", *n_notices, message); +} + static void test_simple_pipeline(PGconn *conn) { PGresult *res = NULL; const char *dummy_params[1] = {"1"}; Oid dummy_param_oids[1] = {INT4OID}; + PQnoticeProcessor oldproc; + int n_notices = 0; fprintf(stderr, "simple pipeline... "); + oldproc = PQsetNoticeProcessor(conn, notice_processor, &n_notices); + /* * Enter pipeline mode and dispatch a set of operations, which we'll then * process the results of as they come in. @@ -1052,6 +1066,91 @@ test_simple_pipeline(PGconn *conn) if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF) pg_fatal("Exiting pipeline mode didn't seem to work"); + if (n_notices > 0) + pg_fatal("unexpected notice"); + + + /* Try the same thing with PQsendQuery */ + if (PQenterPipelineMode(conn) != 1) + pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn)); + + if (PQsendQuery(conn, "SELECT 1") != 1) + pg_fatal("failed to send query: %s", PQerrorMessage(conn)); + PQsendFlushRequest(conn); + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned null when there's a pipeline item: %s", + PQerrorMessage(conn)); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pg_fatal("Unexpected result code %s from first pipeline item", + PQresStatus(PQresultStatus(res))); + PQclear(res); + + res = PQgetResult(conn); + if (res != NULL) + pg_fatal("expected NULL result"); + + if (PQpipelineSync(conn) != 1) + pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn)); + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned null when there's a pipeline item: %s", + PQerrorMessage(conn)); + if (PQresultStatus(res) != PGRES_PIPELINE_SYNC) + pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s", + PQresStatus(PQresultStatus(res)), PQerrorMessage(conn)); + PQclear(res); + res = NULL; + + if (PQexitPipelineMode(conn) != 1) + pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s", + PQerrorMessage(conn)); + + /* + * Must not have got any notices here; note bug as described in + * https://postgr.es/m/ca+mi_8bvd0_cw3sumgwpvwdnzxy32itog_16tdyru_1s2gv...@mail.gmail.com + */ + if (n_notices > 0) + pg_fatal("got %d notice(s)", n_notices); + + PQsetNoticeProcessor(conn, oldproc, NULL); + + /* + * Send a second command when libpq is in "pipeline-idle" state. + */ + if (PQenterPipelineMode(conn) != 1) + pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn)); + if (PQsendQuery(conn, "SELECT 1") != 1) + pg_fatal("failed to send query: %s", PQerrorMessage(conn)); + PQsendFlushRequest(conn); + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned null when there's a pipeline item: %s", + PQerrorMessage(conn)); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pg_fatal("unexpected result code %s from first pipeline item", + PQresStatus(PQresultStatus(res))); + if (PQsendQuery(conn, "SELECT 2") != 1) + pg_fatal("failed to send query: %s", PQerrorMessage(conn)); + PQsendFlushRequest(conn); + /* read terminating null from first query */ + res = PQgetResult(conn); + if (res != NULL) + pg_fatal("did not receive terminating NULL"); + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned null when there's a pipeline item: %s", + PQerrorMessage(conn)); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pg_fatal("unexpected result code %s from first pipeline item", + PQresStatus(PQresultStatus(res))); + res = PQgetResult(conn); + if (res != NULL) + pg_fatal("did not receive terminating NULL"); + if (PQexitPipelineMode(conn) != 1) + pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s", + PQerrorMessage(conn)); + fprintf(stderr, "ok\n"); } diff --git a/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace b/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace index 5c94749bc1..e1c0cba07e 100644 --- a/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace +++ b/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace @@ -9,4 +9,41 @@ B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0 B 11 DataRow 1 1 '1' B 13 CommandComplete "SELECT 1" B 5 ReadyForQuery I +F 16 Parse "" "SELECT 1" 0 +F 12 Bind "" "" 0 0 0 +F 6 Describe P "" +F 9 Execute "" 0 +F 6 Close P "" +F 4 Flush +B 4 ParseComplete +B 4 BindComplete +B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0 +B 11 DataRow 1 1 '1' +B 13 CommandComplete "SELECT 1" +F 4 Sync +B 4 CloseComplete +B 5 ReadyForQuery I +F 16 Parse "" "SELECT 1" 0 +F 12 Bind "" "" 0 0 0 +F 6 Describe P "" +F 9 Execute "" 0 +F 6 Close P "" +F 4 Flush +B 4 ParseComplete +B 4 BindComplete +B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0 +B 11 DataRow 1 1 '1' +B 13 CommandComplete "SELECT 1" +F 16 Parse "" "SELECT 2" 0 +F 12 Bind "" "" 0 0 0 +F 6 Describe P "" +F 9 Execute "" 0 +F 6 Close P "" +F 4 Flush +B 4 CloseComplete +B 4 ParseComplete +B 4 BindComplete +B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0 +B 11 DataRow 1 1 '2' +B 13 CommandComplete "SELECT 1" F 4 Terminate -- 2.30.2
>From 48ff379f2688ec4237d863a16ff0e23e668bc3a4 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Wed, 15 Jun 2022 19:42:23 +0200 Subject: [PATCH v5 2/2] Use Test::Differences if available --- src/test/modules/libpq_pipeline/README | 3 +++ .../modules/libpq_pipeline/t/001_libpq_pipeline.pl | 13 ++++++++++++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/src/test/modules/libpq_pipeline/README b/src/test/modules/libpq_pipeline/README index d8174dd579..59c6ea8109 100644 --- a/src/test/modules/libpq_pipeline/README +++ b/src/test/modules/libpq_pipeline/README @@ -1 +1,4 @@ Test programs and libraries for libpq + +If you have Test::Differences installed, any differences in the trace files +are displayed in a format that's easier to read than the standard format. diff --git a/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl b/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl index d8d496c995..49eec8a63a 100644 --- a/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl +++ b/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl @@ -9,6 +9,17 @@ use PostgresNode; use TestLib; use Test::More; +# Use Test::Differences if installed, and select unified diff output. +# No decent way to select a context line count with this; +# we could use a sub ref to allow that. +BEGIN +{ + if (!eval q{ use Test::Differences; unified_diff(); 1 }) + { + *eq_or_diff = \&is; + } +} + my $node = get_new_node('main'); $node->init; $node->start; @@ -54,7 +65,7 @@ for my $testname (@tests) $result = slurp_file_eval($traceout); next unless $result ne ""; - is($result, $expected, "$testname trace match"); + eq_or_diff($result, $expected, "$testname trace match"); } } -- 2.30.2