Hello pgdev,
This patch rework & clarifies pgbench internal state machine.
It was indirectly triggered by Heikki review of pgbench tap tests
(https://commitfest.postgresql.org/19/1306/), and by Marina's patch about
tx retry on some errors (https://commitfest.postgresql.org/19/1645/) which
I am reviewing.
- it adds more comments to the enum state definitions and to doCustom.
- there is some code cleanup/simplifications:
. a few useless intermediate variables are removed,
. a macro is added to avoid a repeated pattern to set the current time,
. performance data are always collected instead of trying to be clever
and not collect some data in some cases.
- more fundamentally, all state changes are performed within doCustom,
prior that there was one performed by threadRun which made undertanding
the end of run harder. Now threadRun only look at the current state
to make decisions about a client.
- doCustom is made to always return at the end of a script to avoid
an infinite loop on backslash-command only script, instead of hack
with a variable to detect loops, which made it return every two
script runs in such cases.
- there is a small behavioral change:
prior to the patch, a script would always run to its end once started,
with the exception of \sleep commands which could be interrupted by
threadRun.
Marina's patch should enforce somehow one script = one transaction so
that a retry makes sense, so this exception is removed to avoid
aborting a tx implicitely.
--
Fabien.diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 41b756c089..369e321196 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -267,14 +267,22 @@ typedef enum
* transaction, and advance to CSTATE_THROTTLE. CSTATE_THROTTLE state
* sleeps until that moment. (If throttling is not enabled, doCustom()
* falls directly through from CSTATE_START_THROTTLE to
CSTATE_START_TX.)
+ *
+ * It may also detect that the next transaction would start beyond the
end
+ * of run, and switch to CSTATE_FINISHED.
*/
CSTATE_START_THROTTLE,
CSTATE_THROTTLE,
/*
* CSTATE_START_TX performs start-of-transaction processing.
Establishes
- * a new connection for the transaction, in --connect mode, and records
- * the transaction start time.
+ * a new connection for the transaction in --connect mode, records
+ * the transaction start time, and proceed to the first command.
+ *
+ * Note: once a script is started, it will either error or run till
+ * its end, where it may be interrupted. It is not interrupted while
+ * running, so pgbench --time is to be understood as tx are allowed to
+ * start in that time, and will finish when their work is completed.
*/
CSTATE_START_TX,
@@ -287,9 +295,6 @@ typedef enum
* and we enter the CSTATE_SLEEP state to wait for it to expire. Other
* meta-commands are executed immediately.
*
- * CSTATE_SKIP_COMMAND for conditional branches which are not executed,
- * quickly skip commands that do not need any evaluation.
- *
* CSTATE_WAIT_RESULT waits until we get a result set back from the
server
* for the current command.
*
@@ -297,19 +302,25 @@ typedef enum
*
* CSTATE_END_COMMAND records the end-of-command timestamp, increments
the
* command counter, and loops back to CSTATE_START_COMMAND state.
+ *
+ * CSTATE_SKIP_COMMAND is used by conditional branches which are not
+ * executed. It quickly skip commands that do not need any evaluation.
+ * This state can move forward several commands, till there is something
+ * to do or the end of the script.
*/
CSTATE_START_COMMAND,
- CSTATE_SKIP_COMMAND,
CSTATE_WAIT_RESULT,
CSTATE_SLEEP,
CSTATE_END_COMMAND,
+ CSTATE_SKIP_COMMAND,
/*
- * CSTATE_END_TX performs end-of-transaction processing. Calculates
- * latency, and logs the transaction. In --connect mode, closes the
- * current connection. Chooses the next script to execute and starts
over
- * in CSTATE_START_THROTTLE state, or enters CSTATE_FINISHED if we have
no
- * more work to do.
+ * CSTATE_END_TX performs end-of-transaction processing. It calculates
+ * latency, and logs the transaction. In --connect mode, it closes the
+ * current connection.
+ *
+ * Then either starts over in CSTATE_CHOOSE_SCRIPT, or enters
CSTATE_FINISHED
+ * if we have no more work to do.
*/
CSTATE_END_TX,
@@ -2679,16 +2690,13 @@ evaluateSleep(CState *st, int argc, char **argv, int
*usecs)
/*
* Advance the state machine of a connection, if possible.
+ *
+ * All state changes are performed within this function called
+ * by threadRun.
*/
static void
doCustom(TState *thread, CState *st, StatsData *agg)
{
- PGresult *res;
- Command *command;
- instr_time now;
- bool end_tx_processed = false;
- int64 wait;
-
/*
* gettimeofday() isn't free, so we get the current timestamp lazily the
* first time it's needed, and reuse the same value throughout this
@@ -2697,37 +2705,44 @@ doCustom(TState *thread, CState *st, StatsData *agg)
* means "not set yet". Reset "now" when we execute shell commands or
* expressions, which might take a non-negligible amount of time,
though.
*/
+ instr_time now;
INSTR_TIME_SET_ZERO(now);
/*
* Loop in the state machine, until we have to wait for a result from
the
- * server (or have to sleep, for throttling or for \sleep).
+ * server or have to sleep for throttling or \sleep.
*
* Note: In the switch-statement below, 'break' will loop back here,
* meaning "continue in the state machine". Return is used to return to
- * the caller.
+ * the caller, giving the thread opportunity to move forward another
client.
*/
for (;;)
{
+ PGresult *res;
+ Command *command;
+
switch (st->state)
{
/*
* Select transaction to run.
*/
case CSTATE_CHOOSE_SCRIPT:
-
st->use_file = chooseScript(thread);
if (debug)
fprintf(stderr, "client %d executing
script \"%s\"\n", st->id,
sql_script[st->use_file].desc);
- if (throttle_delay > 0)
+ /* check stack consistency */
+ Assert(conditional_stack_empty(st->cstack));
+
+ if (timer_exceeded)
+ st->state = CSTATE_FINISHED;
+ else if (throttle_delay > 0)
st->state = CSTATE_START_THROTTLE;
else
st->state = CSTATE_START_TX;
- /* check consistency */
- Assert(conditional_stack_empty(st->cstack));
+
break;
/*
@@ -2745,21 +2760,10 @@ doCustom(TState *thread, CState *st, StatsData *agg)
* away.
*/
Assert(throttle_delay > 0);
- wait = getPoissonRand(thread, throttle_delay);
- thread->throttle_trigger += wait;
+ thread->throttle_trigger +=
getPoissonRand(thread, throttle_delay);
st->txn_scheduled = thread->throttle_trigger;
- /*
- * stop client if next transaction is beyond
pgbench end of
- * execution
- */
- if (duration > 0 && st->txn_scheduled >
end_time)
- {
- st->state = CSTATE_FINISHED;
- break;
- }
-
/*
* If --latency-limit is used, and this slot is
already late
* so that the transaction will miss the
latency limit even if
@@ -2771,19 +2775,19 @@ doCustom(TState *thread, CState *st, StatsData *agg)
{
int64 now_us;
- if (INSTR_TIME_IS_ZERO(now))
- INSTR_TIME_SET_CURRENT(now);
+ INSTR_TIME_SET_CURRENT_LAZY(now);
now_us = INSTR_TIME_GET_MICROSEC(now);
+
while (thread->throttle_trigger <
now_us - latency_limit &&
(nxacts <= 0 || st->cnt <
nxacts))
{
processXactStats(thread, st,
&now, true, agg);
/* next rendez-vous */
- wait = getPoissonRand(thread,
throttle_delay);
- thread->throttle_trigger +=
wait;
+ thread->throttle_trigger +=
getPoissonRand(thread, throttle_delay);
st->txn_scheduled =
thread->throttle_trigger;
}
- /* stop client if -t exceeded */
+
+ /* stop client if -t was exceeded in
the previous skip loop */
if (nxacts > 0 && st->cnt >= nxacts)
{
st->state = CSTATE_FINISHED;
@@ -2791,38 +2795,45 @@ doCustom(TState *thread, CState *st, StatsData *agg)
}
}
+ /*
+ * stop client if next transaction is beyond
pgbench end of
+ * execution.
+ */
+ if (duration > 0 && st->txn_scheduled >
end_time)
+ {
+ st->state = CSTATE_FINISHED;
+ break;
+ }
+
st->state = CSTATE_THROTTLE;
- if (debug)
- fprintf(stderr, "client %d throttling "
INT64_FORMAT " us\n",
- st->id, wait);
break;
/*
* Wait until it's time to start next
transaction.
*/
case CSTATE_THROTTLE:
- if (INSTR_TIME_IS_ZERO(now))
- INSTR_TIME_SET_CURRENT(now);
+
+ INSTR_TIME_SET_CURRENT_LAZY(now);
+
if (INSTR_TIME_GET_MICROSEC(now) <
st->txn_scheduled)
- return; /* Still sleeping,
nothing to do here */
+ return; /* still sleeping,
nothing to do here */
- /* Else done sleeping, start the transaction */
- st->state = CSTATE_START_TX;
+ /* done sleeping, but do not start if
transaction if we are done */
+ if (timer_exceeded)
+ st->state = CSTATE_FINISHED;
+ else
+ st->state = CSTATE_START_TX;
break;
/* Start new transaction */
case CSTATE_START_TX:
- /*
- * Establish connection on first call, or if
is_connect is
- * true.
- */
+ /* establish connection if needed, i.e. under
--connect */
if (st->con == NULL)
{
instr_time start;
- if (INSTR_TIME_IS_ZERO(now))
- INSTR_TIME_SET_CURRENT(now);
+ INSTR_TIME_SET_CURRENT_LAZY(now);
start = now;
if ((st->con = doConnect()) == NULL)
{
@@ -2838,28 +2849,20 @@ doCustom(TState *thread, CState *st, StatsData *agg)
memset(st->prepared, 0,
sizeof(st->prepared));
}
+ /* record transaction start time. */
+ INSTR_TIME_SET_CURRENT_LAZY(now);
+ st->txn_begin = now;
+
/*
- * Record transaction start time under logging,
progress or
- * throttling.
+ * When not throttling, this is also the
transaction's
+ * scheduled start time.
*/
- if (use_log || progress || throttle_delay ||
latency_limit ||
- per_script_stats)
- {
- if (INSTR_TIME_IS_ZERO(now))
- INSTR_TIME_SET_CURRENT(now);
- st->txn_begin = now;
-
- /*
- * When not throttling, this is also
the transaction's
- * scheduled start time.
- */
- if (!throttle_delay)
- st->txn_scheduled =
INSTR_TIME_GET_MICROSEC(now);
- }
+ if (!throttle_delay)
+ st->txn_scheduled =
INSTR_TIME_GET_MICROSEC(now);
/* Begin with the first command */
- st->command = 0;
st->state = CSTATE_START_COMMAND;
+ st->command = 0;
break;
/*
@@ -2878,17 +2881,11 @@ doCustom(TState *thread, CState *st, StatsData *agg)
break;
}
- /*
- * Record statement start time if per-command
latencies are
- * requested
- */
- if (is_latencies)
- {
- if (INSTR_TIME_IS_ZERO(now))
- INSTR_TIME_SET_CURRENT(now);
- st->stmt_begin = now;
- }
+ /* record statement start time. */
+ INSTR_TIME_SET_CURRENT_LAZY(now);
+ st->stmt_begin = now;
+ /* execute the command */
if (command->type == SQL_COMMAND)
{
if (!sendCommand(st, command))
@@ -2931,8 +2928,8 @@ doCustom(TState *thread, CState *st, StatsData *agg)
break;
}
- if (INSTR_TIME_IS_ZERO(now))
-
INSTR_TIME_SET_CURRENT(now);
+
INSTR_TIME_SET_CURRENT_LAZY(now);
+
st->sleep_until =
INSTR_TIME_GET_MICROSEC(now) + usec;
st->state = CSTATE_SLEEP;
break;
@@ -2983,10 +2980,7 @@ doCustom(TState *thread, CState *st, StatsData *agg)
}
else /* elif */
{
- /*
- * we should
get here only if the "elif"
- * needed
evaluation
- */
+ /* we should
get here only if the "elif" needed evaluation */
Assert(conditional_stack_peek(st->cstack) == IFSTATE_FALSE);
conditional_stack_poke(st->cstack, cond ? IFSTATE_TRUE : IFSTATE_FALSE);
}
@@ -3018,43 +3012,23 @@ doCustom(TState *thread, CState *st, StatsData *agg)
}
else if (command->meta == META_SETSHELL)
{
- bool ret =
runShellCommand(st, argv[1], argv + 2, argc - 2);
-
- if (timer_exceeded) /* timeout
*/
- {
- st->state =
CSTATE_FINISHED;
- break;
- }
- else if (!ret) /* on error */
+ if (!runShellCommand(st,
argv[1], argv + 2, argc - 2))
{
commandFailed(st,
"setshell", "execution of meta-command failed");
st->state =
CSTATE_ABORTED;
break;
}
- else
- {
- /* succeeded */
- }
+ /* else success */
}
else if (command->meta == META_SHELL)
{
- bool ret =
runShellCommand(st, NULL, argv + 1, argc - 1);
-
- if (timer_exceeded) /* timeout
*/
- {
- st->state =
CSTATE_FINISHED;
- break;
- }
- else if (!ret) /* on error */
+ if (!runShellCommand(st, NULL,
argv + 1, argc - 1))
{
commandFailed(st,
"shell", "execution of meta-command failed");
st->state =
CSTATE_ABORTED;
break;
}
- else
- {
- /* succeeded */
- }
+ /* else success */
}
move_to_end_command:
@@ -3156,6 +3130,7 @@ doCustom(TState *thread, CState *st, StatsData *agg)
}
if (st->state != CSTATE_SKIP_COMMAND)
+ /* out of quick skip command
loop */
break;
}
break;
@@ -3205,10 +3180,9 @@ doCustom(TState *thread, CState *st, StatsData *agg)
* instead of CSTATE_START_TX.
*/
case CSTATE_SLEEP:
- if (INSTR_TIME_IS_ZERO(now))
- INSTR_TIME_SET_CURRENT(now);
+ INSTR_TIME_SET_CURRENT_LAZY(now);
if (INSTR_TIME_GET_MICROSEC(now) <
st->sleep_until)
- return; /* Still sleeping,
nothing to do here */
+ return; /* still sleeping,
nothing to do here */
/* Else done sleeping. */
st->state = CSTATE_END_COMMAND;
break;
@@ -3223,17 +3197,13 @@ doCustom(TState *thread, CState *st, StatsData *agg)
* in thread-local data structure, if
per-command latencies
* are requested.
*/
- if (is_latencies)
- {
- if (INSTR_TIME_IS_ZERO(now))
- INSTR_TIME_SET_CURRENT(now);
+ INSTR_TIME_SET_CURRENT_LAZY(now);
- /* XXX could use a mutex here, but we
choose not to */
- command =
sql_script[st->use_file].commands[st->command];
- addToSimpleStats(&command->stats,
-
INSTR_TIME_GET_DOUBLE(now) -
-
INSTR_TIME_GET_DOUBLE(st->stmt_begin));
- }
+ /* XXX could use a mutex here, but we choose
not to */
+ command =
sql_script[st->use_file].commands[st->command];
+ addToSimpleStats(&command->stats,
+
INSTR_TIME_GET_DOUBLE(now) -
+
INSTR_TIME_GET_DOUBLE(st->stmt_begin));
/* Go ahead with next command, to be executed
or skipped */
st->command++;
@@ -3242,19 +3212,15 @@ doCustom(TState *thread, CState *st, StatsData *agg)
break;
/*
- * End of transaction.
+ * End of transaction (end of script, really).
*/
case CSTATE_END_TX:
/* transaction finished: calculate latency and
do log */
processXactStats(thread, st, &now, false, agg);
- /* conditional stack must be empty */
- if (!conditional_stack_empty(st->cstack))
- {
- fprintf(stderr, "end of script reached
within a conditional, missing \\endif\n");
- exit(1);
- }
+ /* missing \endif... cannot happen if
CheckConditional was okay */
+ Assert(conditional_stack_empty(st->cstack));
if (is_connect)
{
@@ -3268,26 +3234,17 @@ doCustom(TState *thread, CState *st, StatsData *agg)
st->state = CSTATE_FINISHED;
break;
}
+ else
+ {
+ /* next transaction */
+ st->state = CSTATE_CHOOSE_SCRIPT;
- /*
- * No transaction is underway anymore.
- */
- st->state = CSTATE_CHOOSE_SCRIPT;
-
- /*
- * If we paced through all commands in the
script in this
- * loop, without returning to the caller even
once, do it now.
- * This gives the thread a chance to process
other
- * connections, and to do progress reporting.
This can
- * currently only happen if the script consists
entirely of
- * meta-commands.
- */
- if (end_tx_processed)
+ /*
+ * Ensure that we always return on this
point, so as
+ * to avoid an infinite loop if the
script only contains
+ * meta commands.
+ */
return;
- else
- {
- end_tx_processed = true;
- break;
}
/*
@@ -3401,8 +3358,7 @@ processXactStats(TState *thread, CState *st, instr_time
*now,
if (detailed && !skipped)
{
- if (INSTR_TIME_IS_ZERO(*now))
- INSTR_TIME_SET_CURRENT(*now);
+ INSTR_TIME_SET_CURRENT_LAZY(*now);
/* compute latency & lag */
latency = INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled;
@@ -5652,7 +5608,7 @@ threadRun(void *arg)
if (!is_connect)
{
- /* make connections to the database */
+ /* make connections to the database before starting */
for (i = 0; i < nstate; i++)
{
if ((state[i].con = doConnect()) == NULL)
@@ -5686,14 +5642,7 @@ threadRun(void *arg)
{
CState *st = &state[i];
- if (st->state == CSTATE_THROTTLE && timer_exceeded)
- {
- /* interrupt client that has not started a
transaction */
- st->state = CSTATE_FINISHED;
- finishCon(st);
- remains--;
- }
- else if (st->state == CSTATE_SLEEP || st->state ==
CSTATE_THROTTLE)
+ if (st->state == CSTATE_SLEEP || st->state ==
CSTATE_THROTTLE)
{
/* a nap from the script, or under throttling */
int64 this_usec;
diff --git a/src/include/portability/instr_time.h
b/src/include/portability/instr_time.h
index f968444671..c46f6825bb 100644
--- a/src/include/portability/instr_time.h
+++ b/src/include/portability/instr_time.h
@@ -20,6 +20,8 @@
*
* INSTR_TIME_SET_CURRENT(t) set t to current time
*
+ * INSTR_TIME_SET_CURRENT_LAZY(t) set t to current time if t is zero
+ *
* INSTR_TIME_ADD(x, y) x += y
*
* INSTR_TIME_SUBTRACT(x, y) x -= y
@@ -245,4 +247,9 @@ GetTimerFrequency(void)
#endif /* WIN32 */
+/* same macro on all platforms */
+#define INSTR_TIME_SET_CURRENT_LAZY(t) \
+ if (INSTR_TIME_IS_ZERO(t)) \
+ INSTR_TIME_SET_CURRENT(t)
+
#endif /* INSTR_TIME_H */