Hello, This attached patch enables pgbench to cancel queries during benchmark.
Formerly, Ctrl+C during benchmark killed pgbench immediately, but backend processes executing long queries remained for a while. You can simply reproduce this problem by cancelling the pgbench running a custom script executing "SELECT pg_sleep(10)". The patch fixes this so that cancel requests are sent for all connections on Ctrl+C, and all running queries are cancelled before pgbench exits. In thread #0, setup_cancel_handler is called before the loop, the CancelRequested flag is set when Ctrl+C is issued. In the loop, cancel requests are sent when the flag is set only in thread #0. SIGINT is blocked in other threads, but the threads will exit after their query are cancelled. If thread safety is disabled or OS is Windows, the signal is not blocked because pthread_sigmask cannot be used. (I didn't test the patch on WIndows yet, though.) I choose the design that the signal handler and the query cancel are performed only in thread #0 because I wanted to make the behavior as predicable as possible. However, another design that all thread can received SIGINT and that the first thread that catches the signal is responsible to sent cancel requests for all connections may also work. Also, the array of CState that contains all clients state is changed to a global variable so that all connections can be accessed within a thread. Regards, Yugo Nagata -- Yugo NAGATA <nag...@sraoss.co.jp>
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index 1d1670d4c2..4e80afdf9b 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -606,6 +606,7 @@ typedef enum typedef struct { PGconn *con; /* connection handle to DB */ + PGcancel *cancel; /* query cancel */ int id; /* client No. */ ConnectionStateEnum state; /* state machine's current state. */ ConditionalStack cstack; /* enclosing conditionals state */ @@ -648,6 +649,8 @@ typedef struct * here */ } CState; +CState *all_state; /* status of all clients */ + /* * Thread state */ @@ -3639,6 +3642,7 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg) st->state = CSTATE_ABORTED; break; } + st->cancel = PQgetCancel(st->con); /* reset now after connection */ now = pg_time_now(); @@ -4670,6 +4674,18 @@ disconnect_all(CState *state, int length) finishCon(&state[i]); } +/* send cancel requests to all connections */ +static void +cancel_all() +{ + for (int i = 0; i < nclients; i++) + { + char errbuf[1]; + if (all_state[i].cancel != NULL) + (void) PQcancel(all_state[i].cancel, errbuf, sizeof(errbuf)); + } +} + /* * Remove old pgbench tables, if any exist */ @@ -6607,7 +6623,6 @@ main(int argc, char **argv) bool initialization_option_set = false; bool internal_script_used = false; - CState *state; /* status of clients */ TState *threads; /* array of thread */ pg_time_usec_t @@ -6656,7 +6671,7 @@ main(int argc, char **argv) } } - state = (CState *) pg_malloc0(sizeof(CState)); + all_state = (CState *) pg_malloc0(sizeof(CState)); /* set random seed early, because it may be used while parsing scripts. */ if (!set_random_seed(getenv("PGBENCH_RANDOM_SEED"))) @@ -6715,7 +6730,7 @@ main(int argc, char **argv) pg_fatal("invalid variable definition: \"%s\"", optarg); *p++ = '\0'; - if (!putVariable(&state[0].variables, "option", optarg, p)) + if (!putVariable(&all_state[0].variables, "option", optarg, p)) exit(1); } break; @@ -7087,28 +7102,28 @@ main(int argc, char **argv) if (nclients > 1) { - state = (CState *) pg_realloc(state, sizeof(CState) * nclients); - memset(state + 1, 0, sizeof(CState) * (nclients - 1)); + all_state = (CState *) pg_realloc(all_state, sizeof(CState) * nclients); + memset(all_state + 1, 0, sizeof(CState) * (nclients - 1)); /* copy any -D switch values to all clients */ for (i = 1; i < nclients; i++) { int j; - state[i].id = i; - for (j = 0; j < state[0].variables.nvars; j++) + all_state[i].id = i; + for (j = 0; j < all_state[0].variables.nvars; j++) { - Variable *var = &state[0].variables.vars[j]; + Variable *var = &all_state[0].variables.vars[j]; if (var->value.type != PGBT_NO_VALUE) { - if (!putVariableValue(&state[i].variables, "startup", + if (!putVariableValue(&all_state[i].variables, "startup", var->name, &var->value)) exit(1); } else { - if (!putVariable(&state[i].variables, "startup", + if (!putVariable(&all_state[i].variables, "startup", var->name, var->svalue)) exit(1); } @@ -7119,8 +7134,8 @@ main(int argc, char **argv) /* other CState initializations */ for (i = 0; i < nclients; i++) { - state[i].cstack = conditional_stack_create(); - initRandomState(&state[i].cs_func_rs); + all_state[i].cstack = conditional_stack_create(); + initRandomState(&all_state[i].cs_func_rs); } /* opening connection... */ @@ -7143,11 +7158,11 @@ main(int argc, char **argv) * :scale variables normally get -s or database scale, but don't override * an explicit -D switch */ - if (lookupVariable(&state[0].variables, "scale") == NULL) + if (lookupVariable(&all_state[0].variables, "scale") == NULL) { for (i = 0; i < nclients; i++) { - if (!putVariableInt(&state[i].variables, "startup", "scale", scale)) + if (!putVariableInt(&all_state[i].variables, "startup", "scale", scale)) exit(1); } } @@ -7156,29 +7171,29 @@ main(int argc, char **argv) * Define a :client_id variable that is unique per connection. But don't * override an explicit -D switch. */ - if (lookupVariable(&state[0].variables, "client_id") == NULL) + if (lookupVariable(&all_state[0].variables, "client_id") == NULL) { for (i = 0; i < nclients; i++) - if (!putVariableInt(&state[i].variables, "startup", "client_id", i)) + if (!putVariableInt(&all_state[i].variables, "startup", "client_id", i)) exit(1); } /* set default seed for hash functions */ - if (lookupVariable(&state[0].variables, "default_seed") == NULL) + if (lookupVariable(&all_state[0].variables, "default_seed") == NULL) { uint64 seed = pg_prng_uint64(&base_random_sequence); for (i = 0; i < nclients; i++) - if (!putVariableInt(&state[i].variables, "startup", "default_seed", + if (!putVariableInt(&all_state[i].variables, "startup", "default_seed", (int64) seed)) exit(1); } /* set random seed unless overwritten */ - if (lookupVariable(&state[0].variables, "random_seed") == NULL) + if (lookupVariable(&all_state[0].variables, "random_seed") == NULL) { for (i = 0; i < nclients; i++) - if (!putVariableInt(&state[i].variables, "startup", "random_seed", + if (!putVariableInt(&all_state[i].variables, "startup", "random_seed", random_seed)) exit(1); } @@ -7209,7 +7224,7 @@ main(int argc, char **argv) TState *thread = &threads[i]; thread->tid = i; - thread->state = &state[nclients_dealt]; + thread->state = &all_state[nclients_dealt]; thread->nstate = (nclients - nclients_dealt + nthreads - i - 1) / (nthreads - i); initRandomState(&thread->ts_choose_rs); @@ -7299,7 +7314,7 @@ main(int argc, char **argv) * disconnect_all() will be a no-op, but clean up the connections just to * be sure. We don't need to measure the disconnection delays here. */ - disconnect_all(state, nclients); + disconnect_all(all_state, nclients); /* * Beware that performance of short benchmarks with many threads and @@ -7334,6 +7349,22 @@ threadRun(void *arg) StatsData last, aggs; + if (thread->tid == 0) + setup_cancel_handler(NULL); + +#if defined(ENABLE_THREAD_SAFETY) && !defined(WIN32) + if (thread->tid > 0 && false) + { + sigset_t sigint_sigset; + sigset_t osigset; + sigemptyset(&sigint_sigset); + sigaddset(&sigint_sigset, SIGINT); + + /* Block SIGINT in all threads except one. */ + pthread_sigmask(SIG_BLOCK, &sigint_sigset, &osigset); + } +#endif + /* open log file if requested */ if (use_log) { @@ -7376,6 +7407,7 @@ threadRun(void *arg) pg_fatal("could not create connection for client %d", state[i].id); } + state[i].cancel = PQgetCancel(state[i].con); } } @@ -7414,6 +7446,13 @@ threadRun(void *arg) { CState *st = &state[i]; + if (CancelRequested) + { + if (thread->tid == 0) + cancel_all(); + goto done; + } + if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE) { /* a nap from the script, or under throttling */ @@ -7602,6 +7641,7 @@ finishCon(CState *st) { PQfinish(st->con); st->con = NULL; + st->cancel = NULL; } }