Hello,

This attached patch enables pgbench to cancel queries during benchmark.

Formerly, Ctrl+C during benchmark killed pgbench immediately, but backend
processes executing long queries remained for a while. You can simply
reproduce this problem by cancelling the pgbench running a custom script
executing "SELECT pg_sleep(10)".

The patch fixes this so that cancel requests are sent for all connections on
Ctrl+C, and all running queries are cancelled before pgbench exits.

In thread #0, setup_cancel_handler is called before the loop, the
CancelRequested flag is set when Ctrl+C is issued. In the loop, cancel
requests are sent when the flag is set only in thread #0. SIGINT is
blocked in other threads, but the threads will exit after their query
are cancelled. If thread safety is disabled or OS is Windows, the signal
is not blocked because pthread_sigmask cannot be used. 
(I didn't test the patch on WIndows yet, though.)

I choose the design that the signal handler and the query cancel are
performed only in thread #0 because I wanted to make the behavior as
predicable as possible. However, another design that all thread can
received SIGINT and that the first thread that catches the signal is
responsible to sent cancel requests for all connections may also work.

Also, the array of CState that contains all clients state is changed to
a global variable so that all connections can be accessed within a thread.

Regards,
Yugo Nagata

-- 
Yugo NAGATA <nag...@sraoss.co.jp>
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 1d1670d4c2..4e80afdf9b 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -606,6 +606,7 @@ typedef enum
 typedef struct
 {
 	PGconn	   *con;			/* connection handle to DB */
+	PGcancel   *cancel;			/* query cancel */
 	int			id;				/* client No. */
 	ConnectionStateEnum state;	/* state machine's current state. */
 	ConditionalStack cstack;	/* enclosing conditionals state */
@@ -648,6 +649,8 @@ typedef struct
 								 * here */
 } CState;
 
+CState	*all_state;		/* status of all clients */
+
 /*
  * Thread state
  */
@@ -3639,6 +3642,7 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
 						st->state = CSTATE_ABORTED;
 						break;
 					}
+					st->cancel = PQgetCancel(st->con);
 
 					/* reset now after connection */
 					now = pg_time_now();
@@ -4670,6 +4674,18 @@ disconnect_all(CState *state, int length)
 		finishCon(&state[i]);
 }
 
+/* send cancel requests to all connections */
+static void
+cancel_all()
+{
+	for (int i = 0; i < nclients; i++)
+	{
+		char errbuf[1];
+		if (all_state[i].cancel != NULL)
+			(void) PQcancel(all_state[i].cancel, errbuf, sizeof(errbuf));
+	}
+}
+
 /*
  * Remove old pgbench tables, if any exist
  */
@@ -6607,7 +6623,6 @@ main(int argc, char **argv)
 	bool		initialization_option_set = false;
 	bool		internal_script_used = false;
 
-	CState	   *state;			/* status of clients */
 	TState	   *threads;		/* array of thread */
 
 	pg_time_usec_t
@@ -6656,7 +6671,7 @@ main(int argc, char **argv)
 		}
 	}
 
-	state = (CState *) pg_malloc0(sizeof(CState));
+	all_state = (CState *) pg_malloc0(sizeof(CState));
 
 	/* set random seed early, because it may be used while parsing scripts. */
 	if (!set_random_seed(getenv("PGBENCH_RANDOM_SEED")))
@@ -6715,7 +6730,7 @@ main(int argc, char **argv)
 						pg_fatal("invalid variable definition: \"%s\"", optarg);
 
 					*p++ = '\0';
-					if (!putVariable(&state[0].variables, "option", optarg, p))
+					if (!putVariable(&all_state[0].variables, "option", optarg, p))
 						exit(1);
 				}
 				break;
@@ -7087,28 +7102,28 @@ main(int argc, char **argv)
 
 	if (nclients > 1)
 	{
-		state = (CState *) pg_realloc(state, sizeof(CState) * nclients);
-		memset(state + 1, 0, sizeof(CState) * (nclients - 1));
+		all_state = (CState *) pg_realloc(all_state, sizeof(CState) * nclients);
+		memset(all_state + 1, 0, sizeof(CState) * (nclients - 1));
 
 		/* copy any -D switch values to all clients */
 		for (i = 1; i < nclients; i++)
 		{
 			int			j;
 
-			state[i].id = i;
-			for (j = 0; j < state[0].variables.nvars; j++)
+			all_state[i].id = i;
+			for (j = 0; j < all_state[0].variables.nvars; j++)
 			{
-				Variable   *var = &state[0].variables.vars[j];
+				Variable   *var = &all_state[0].variables.vars[j];
 
 				if (var->value.type != PGBT_NO_VALUE)
 				{
-					if (!putVariableValue(&state[i].variables, "startup",
+					if (!putVariableValue(&all_state[i].variables, "startup",
 										  var->name, &var->value))
 						exit(1);
 				}
 				else
 				{
-					if (!putVariable(&state[i].variables, "startup",
+					if (!putVariable(&all_state[i].variables, "startup",
 									 var->name, var->svalue))
 						exit(1);
 				}
@@ -7119,8 +7134,8 @@ main(int argc, char **argv)
 	/* other CState initializations */
 	for (i = 0; i < nclients; i++)
 	{
-		state[i].cstack = conditional_stack_create();
-		initRandomState(&state[i].cs_func_rs);
+		all_state[i].cstack = conditional_stack_create();
+		initRandomState(&all_state[i].cs_func_rs);
 	}
 
 	/* opening connection... */
@@ -7143,11 +7158,11 @@ main(int argc, char **argv)
 	 * :scale variables normally get -s or database scale, but don't override
 	 * an explicit -D switch
 	 */
-	if (lookupVariable(&state[0].variables, "scale") == NULL)
+	if (lookupVariable(&all_state[0].variables, "scale") == NULL)
 	{
 		for (i = 0; i < nclients; i++)
 		{
-			if (!putVariableInt(&state[i].variables, "startup", "scale", scale))
+			if (!putVariableInt(&all_state[i].variables, "startup", "scale", scale))
 				exit(1);
 		}
 	}
@@ -7156,29 +7171,29 @@ main(int argc, char **argv)
 	 * Define a :client_id variable that is unique per connection. But don't
 	 * override an explicit -D switch.
 	 */
-	if (lookupVariable(&state[0].variables, "client_id") == NULL)
+	if (lookupVariable(&all_state[0].variables, "client_id") == NULL)
 	{
 		for (i = 0; i < nclients; i++)
-			if (!putVariableInt(&state[i].variables, "startup", "client_id", i))
+			if (!putVariableInt(&all_state[i].variables, "startup", "client_id", i))
 				exit(1);
 	}
 
 	/* set default seed for hash functions */
-	if (lookupVariable(&state[0].variables, "default_seed") == NULL)
+	if (lookupVariable(&all_state[0].variables, "default_seed") == NULL)
 	{
 		uint64		seed = pg_prng_uint64(&base_random_sequence);
 
 		for (i = 0; i < nclients; i++)
-			if (!putVariableInt(&state[i].variables, "startup", "default_seed",
+			if (!putVariableInt(&all_state[i].variables, "startup", "default_seed",
 								(int64) seed))
 				exit(1);
 	}
 
 	/* set random seed unless overwritten */
-	if (lookupVariable(&state[0].variables, "random_seed") == NULL)
+	if (lookupVariable(&all_state[0].variables, "random_seed") == NULL)
 	{
 		for (i = 0; i < nclients; i++)
-			if (!putVariableInt(&state[i].variables, "startup", "random_seed",
+			if (!putVariableInt(&all_state[i].variables, "startup", "random_seed",
 								random_seed))
 				exit(1);
 	}
@@ -7209,7 +7224,7 @@ main(int argc, char **argv)
 		TState	   *thread = &threads[i];
 
 		thread->tid = i;
-		thread->state = &state[nclients_dealt];
+		thread->state = &all_state[nclients_dealt];
 		thread->nstate =
 			(nclients - nclients_dealt + nthreads - i - 1) / (nthreads - i);
 		initRandomState(&thread->ts_choose_rs);
@@ -7299,7 +7314,7 @@ main(int argc, char **argv)
 	 * disconnect_all() will be a no-op, but clean up the connections just to
 	 * be sure. We don't need to measure the disconnection delays here.
 	 */
-	disconnect_all(state, nclients);
+	disconnect_all(all_state, nclients);
 
 	/*
 	 * Beware that performance of short benchmarks with many threads and
@@ -7334,6 +7349,22 @@ threadRun(void *arg)
 	StatsData	last,
 				aggs;
 
+	if (thread->tid == 0)
+		setup_cancel_handler(NULL);
+
+#if defined(ENABLE_THREAD_SAFETY) && !defined(WIN32)
+	if (thread->tid > 0 && false)
+	{
+		sigset_t	sigint_sigset;
+		sigset_t	osigset;
+		sigemptyset(&sigint_sigset);
+		sigaddset(&sigint_sigset, SIGINT);
+
+		/* Block SIGINT in all threads except one. */
+		pthread_sigmask(SIG_BLOCK, &sigint_sigset, &osigset);
+	}
+#endif
+
 	/* open log file if requested */
 	if (use_log)
 	{
@@ -7376,6 +7407,7 @@ threadRun(void *arg)
 				pg_fatal("could not create connection for client %d",
 						 state[i].id);
 			}
+			state[i].cancel = PQgetCancel(state[i].con);
 		}
 	}
 
@@ -7414,6 +7446,13 @@ threadRun(void *arg)
 		{
 			CState	   *st = &state[i];
 
+			if (CancelRequested)
+			{
+				if (thread->tid == 0)
+					cancel_all();
+				goto done;
+			}
+
 			if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE)
 			{
 				/* a nap from the script, or under throttling */
@@ -7602,6 +7641,7 @@ finishCon(CState *st)
 	{
 		PQfinish(st->con);
 		st->con = NULL;
+		st->cancel = NULL;
 	}
 }
 

Reply via email to