On 2023-Feb-13, Andres Freund wrote: > There's something wrong with the patch, it reliably fails with core dumps: > https://cirrus-ci.com/github/postgresql-cfbot/postgresql/commitfest%2F42%2F3260
I think this would happen on machines where sizeof(bool) is not 1 (which mine is evidently not). Fixed. In addition, there was the problem that the psprintf() to generate the command name would race against each other if you had multiple threads. I changed the code so that the name to prepare each statement under is generated when the Command struct is first initialized, which occurs before the threads are started. One small issue is that now we use a single counter for all commands of all scripts, rather than a script-local counter. This doesn't seem at all important. I did realize that Nagata-san was right that we've always prepared the whole script in advance; that behavior was there already in commit 49639a7b2c52 that introduced -Mprepared. We've never done each command just before executing it. -- Álvaro Herrera PostgreSQL Developer — https://www.EnterpriseDB.com/ Y una voz del caos me habló y me dijo "Sonríe y sé feliz, podría ser peor". Y sonreí. Y fui feliz. Y fue peor.
>From 0728193a5f02d0dd6a1f3ec5fef314aec646ba33 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Fri, 17 Feb 2023 21:01:15 +0100 Subject: [PATCH v8] pgbench: Prepare commands in pipelines in advance Failing to do so results in an error when a pgbench script starts a serializable transaction inside a pipeline. --- src/bin/pgbench/pgbench.c | 155 +++++++++++++------ src/bin/pgbench/t/001_pgbench_with_server.pl | 20 +++ 2 files changed, 126 insertions(+), 49 deletions(-) diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index 508ed218e8..38e0830e7e 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -628,7 +628,8 @@ typedef struct pg_time_usec_t txn_begin; /* used for measuring schedule lag times */ pg_time_usec_t stmt_begin; /* used for measuring statement latencies */ - bool prepared[MAX_SCRIPTS]; /* whether client prepared the script */ + /* whether client prepared each command of each script */ + bool **prepared; /* * For processing failures and repeating transactions with serialization @@ -733,12 +734,13 @@ static const char *QUERYMODE[] = {"simple", "extended", "prepared"}; * argv Command arguments, the first of which is the command or SQL * string itself. For SQL commands, after post-processing * argv[0] is the same as 'lines' with variables substituted. - * varprefix SQL commands terminated with \gset or \aset have this set + * varprefix SQL commands terminated with \gset or \aset have this set * to a non NULL value. If nonempty, it's used to prefix the * variable name that receives the value. * aset do gset on all possible queries of a combined query (\;). * expr Parsed expression, if needed. * stats Time spent in this command. + * prepname The name that this command is prepared under, in prepare mode * retries Number of retries after a serialization or deadlock error in the * current command. * failures Number of errors in the current command that were not retried. @@ -754,6 +756,7 @@ typedef struct Command char *varprefix; PgBenchExpr *expr; SimpleStats stats; + char *prepname; int64 retries; int64 failures; } Command; @@ -3006,13 +3009,6 @@ runShellCommand(Variables *variables, char *variable, char **argv, int argc) return true; } -#define MAX_PREPARE_NAME 32 -static void -preparedStatementName(char *buffer, int file, int state) -{ - sprintf(buffer, "P%d_%d", file, state); -} - /* * Report the abortion of the client when processing SQL commands. */ @@ -3053,6 +3049,87 @@ chooseScript(TState *thread) return i - 1; } +/* + * Prepare the SQL command from st->use_file at command_num. + */ +static void +prepareCommand(CState *st, int command_num) +{ + Command *command = sql_script[st->use_file].commands[command_num]; + + /* No prepare for non-SQL commands */ + if (command->type != SQL_COMMAND) + return; + + /* + * If not already done, allocate space for 'prepared' flags: one boolean + * for each command of each script. + */ + if (!st->prepared) + { + st->prepared = pg_malloc(sizeof(bool *) * num_scripts); + for (int i = 0; i < num_scripts; i++) + { + ParsedScript *script = &sql_script[i]; + int numcmds; + + for (numcmds = 0; script->commands[numcmds] != NULL; numcmds++) + ; + st->prepared[i] = pg_malloc0(sizeof(bool) * numcmds); + } + } + + if (!st->prepared[st->use_file][command_num]) + { + PGresult *res; + + pg_log_debug("client %d preparing %s", st->id, command->prepname); + res = PQprepare(st->con, command->prepname, + command->argv[0], command->argc - 1, NULL); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_log_error("%s", PQerrorMessage(st->con)); + PQclear(res); + st->prepared[st->use_file][command_num] = true; + } +} + +/* + * Prepare all the commands in the script that come after the \startpipeline + * that's at position st->command, and the first \endpipeline we find. + * + * (This sets the ->prepared flag for each relevant command, but doesn't move + * the st->command counter) + */ +static void +prepareCommandsInPipeline(CState *st) +{ + int j; + Command **commands = sql_script[st->use_file].commands; + + Assert(commands[st->command]->type == META_COMMAND && + commands[st->command]->meta == META_STARTPIPELINE); + + /* + * We set the 'prepared' flag on the \startpipeline itself to flag that we + * don't need to do this next time without calling prepareCommand(), even + * though we don't actually prepare this command. + */ + if (st->prepared && + st->prepared[st->use_file][st->command]) + return; + + for (j = st->command + 1; commands[j] != NULL; j++) + { + if (commands[j]->type == META_COMMAND && + commands[j]->meta == META_ENDPIPELINE) + break; + + prepareCommand(st, j); + } + + st->prepared[st->use_file][st->command] = true; +} + /* Send a SQL command, using the chosen querymode */ static bool sendCommand(CState *st, Command *command) @@ -3083,49 +3160,13 @@ sendCommand(CState *st, Command *command) } else if (querymode == QUERY_PREPARED) { - char name[MAX_PREPARE_NAME]; const char *params[MAX_ARGS]; - if (!st->prepared[st->use_file]) - { - int j; - Command **commands = sql_script[st->use_file].commands; - - for (j = 0; commands[j] != NULL; j++) - { - PGresult *res; - - if (commands[j]->type != SQL_COMMAND) - continue; - preparedStatementName(name, st->use_file, j); - if (PQpipelineStatus(st->con) == PQ_PIPELINE_OFF) - { - res = PQprepare(st->con, name, - commands[j]->argv[0], commands[j]->argc - 1, NULL); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - pg_log_error("%s", PQerrorMessage(st->con)); - PQclear(res); - } - else - { - /* - * In pipeline mode, we use asynchronous functions. If a - * server-side error occurs, it will be processed later - * among the other results. - */ - if (!PQsendPrepare(st->con, name, - commands[j]->argv[0], commands[j]->argc - 1, NULL)) - pg_log_error("%s", PQerrorMessage(st->con)); - } - } - st->prepared[st->use_file] = true; - } - + prepareCommand(st, st->command); getQueryParams(&st->variables, command, params); - preparedStatementName(name, st->use_file, st->command); - pg_log_debug("client %d sending %s", st->id, name); - r = PQsendQueryPrepared(st->con, name, command->argc - 1, + pg_log_debug("client %d sending %s", st->id, command->prepname); + r = PQsendQueryPrepared(st->con, command->prepname, command->argc - 1, params, NULL, NULL, 0); } else /* unknown sql mode */ @@ -3597,7 +3638,8 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg) thread->conn_duration += now - start; /* Reset session-local state */ - memset(st->prepared, 0, sizeof(st->prepared)); + pg_free(st->prepared); + st->prepared = NULL; } /* @@ -4360,6 +4402,16 @@ executeMetaCommand(CState *st, pg_time_usec_t *now) return CSTATE_ABORTED; } + /* + * If we're in prepared-query mode, we need to prepare all the + * commands that are inside the pipeline before we actually start the + * pipeline itself. This solves the problem that running BEGIN + * ISOLATION LEVEL SERIALIZABLE in a pipeline would fail due to a + * snapshot having been acquired by the prepare within the pipeline. + */ + if (querymode == QUERY_PREPARED) + prepareCommandsInPipeline(st); + if (PQpipelineStatus(st->con) != PQ_PIPELINE_OFF) { commandFailed(st, "startpipeline", "already in pipeline mode"); @@ -5421,6 +5473,7 @@ create_sql_command(PQExpBuffer buf, const char *source) { Command *my_command; char *p = skip_sql_comments(buf->data); + static int prepnr = 0; if (p == NULL) return NULL; @@ -5439,6 +5492,10 @@ create_sql_command(PQExpBuffer buf, const char *source) my_command->varprefix = NULL; /* allocated later, if needed */ my_command->expr = NULL; initSimpleStats(&my_command->stats); + if (querymode == QUERY_PREPARED) + my_command->prepname = psprintf("P_%d", prepnr++); + else + my_command->prepname = NULL; return my_command; } diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl index 4bf508ea96..99273203f0 100644 --- a/src/bin/pgbench/t/001_pgbench_with_server.pl +++ b/src/bin/pgbench/t/001_pgbench_with_server.pl @@ -839,6 +839,26 @@ select 1 \gset f } }); +# Working \startpipeline in prepared query mode with serializable +$node->pgbench( + '-c4 -j2 -t 10 -n -M prepared', + 0, + [ + qr{type: .*/001_pgbench_pipeline_serializable}, + qr{actually processed: (\d+)/\1} + ], + [], + 'working \startpipeline with serializable', + { + '001_pgbench_pipeline_serializable' => q{ +-- test startpipeline with serializable +\startpipeline +BEGIN ISOLATION LEVEL SERIALIZABLE; +} . "select 1;\n" x 10 . q{ +END; +\endpipeline +} + }); # trigger many expression errors my @errors = ( -- 2.30.2