On Wed, 2 Aug 2023 16:37:53 +0900
Yugo NAGATA <[email protected]> wrote:
> Hello Fabien,
>
> On Fri, 14 Jul 2023 20:32:01 +0900
> Yugo NAGATA <[email protected]> wrote:
>
> I attached the updated patch.
I'm sorry. I forgot to attach the patch.
Regards,
Yugo Nagata
>
> > Hello Fabien,
> >
> > Thank you for your review!
> >
> > On Mon, 3 Jul 2023 20:39:23 +0200 (CEST)
> > Fabien COELHO <[email protected]> wrote:
> >
> > >
> > > Yugo-san,
> > >
> > > Some feedback about v1 of this patch.
> > >
> > > Patch applies cleanly, compiles.
> > >
> > > There are no tests, could there be one? ISTM that one could be done with
> > > a
> > > "SELECT pg_sleep(...)" script??
> >
> > Agreed. I will add the test.
>
> I added a TAP test.
>
> >
> > > The global name "all_state" is quite ambiguous, what about
> > > "client_states"
> > > instead? Or maybe it could be avoided, see below.
> > >
> > > Instead of renaming "state" to "all_state" (or client_states as suggested
> > > above), I'd suggest to minimize the patch by letting "state" inside the
> > > main and adding a "client_states = state;" just after the allocation, or
> > > another approach, see below.
> >
> > Ok, I'll fix to add a global variable "client_states" and make this point to
> > "state" instead of changing "state" to global.
>
> Done.
>
> >
> > > Should PQfreeCancel be called on deconnections, in finishCon? I think
> > > that
> > > there may be a memory leak with the current implementation??
> >
> > Agreed. I'll fix.
>
> Done.
>
> Regards,
> Yugo Nagata
>
> >
> > > Maybe it should check that cancel is not NULL before calling PQcancel?
> >
> > I think this is already checked as below, but am I missing something?
> >
> > + if (all_state[i].cancel != NULL)
> > + (void) PQcancel(all_state[i].cancel, errbuf, sizeof(errbuf));
> >
> > > After looking at the code, I'm very unclear whether they may be some
> > > underlying race conditions, or not, depending on when the cancel is
> > > triggered. I think that some race conditions are still likely given the
> > > current thread 0 implementation, and dealing with them with a barrier or
> > > whatever is not desirable at all.
> > >
> > > In order to work around this issue, ISTM that we should go away from the
> > > simple and straightforward thread 0 approach, and the only clean way is
> > > that the cancelation should be managed by each thread for its own client.
> > >
> > > I'd suggest to have the advanceState to call PQcancel when
> > > CancelRequested
> > > is set and switch to CSTATE_ABORTED to end properly. This means that
> > > there
> > > would be no need for the global client states pointer, so the patch
> > > should
> > > be smaller and simpler. Possibly there would be some shortcuts added here
> > > and there to avoid lingering after the control-C, in threadRun.
> >
> > I am not sure this approach is simpler than mine.
> >
> > In multi-threads, only one thread can catches the signal and other threads
> > continue to run. Therefore, if Ctrl+C is pressed while threads are waiting
> > responses from the backend in wait_on_socket_set, only one thread can be
> > interrupted and return, but other threads will continue to wait and cannot
> > check CancelRequested. So, for implementing your suggestion, we need any
> > hack
> > to make all threads return from wait_on_socket_set when the event occurs,
> > but
> > I don't have idea to do this in simpler way.
> >
> > In my patch, all threads can return from wait_on_socket_set at Ctrl+C
> > because when thread #0 cancels all connections, the following error is
> > sent to all sessions:
> >
> > ERROR: canceling statement due to user request
> >
> > and all threads will receive the response from the backend.
> >
> > Regards,
> > Yugo Nagata
> >
> > --
> > Yugo NAGATA <[email protected]>
> >
> >
>
>
> --
> Yugo NAGATA <[email protected]>
>
>
--
Yugo NAGATA <[email protected]>
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 539c2795e2..68278d2b18 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -596,6 +596,7 @@ typedef enum
typedef struct
{
PGconn *con; /* connection handle to DB */
+ PGcancel *cancel; /* query cancel */
int id; /* client No. */
ConnectionStateEnum state; /* state machine's current state. */
ConditionalStack cstack; /* enclosing conditionals state */
@@ -638,6 +639,8 @@ typedef struct
* here */
} CState;
+CState *client_states; /* status of all clients */
+
/*
* Thread state
*/
@@ -3631,6 +3634,7 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
st->state = CSTATE_ABORTED;
break;
}
+ st->cancel = PQgetCancel(st->con);
/* reset now after connection */
now = pg_time_now();
@@ -4662,6 +4666,18 @@ disconnect_all(CState *state, int length)
finishCon(&state[i]);
}
+/* send cancel requests to all connections */
+static void
+cancel_all()
+{
+ for (int i = 0; i < nclients; i++)
+ {
+ char errbuf[1];
+ if (client_states[i].cancel != NULL)
+ (void) PQcancel(client_states[i].cancel, errbuf, sizeof(errbuf));
+ }
+}
+
/*
* Remove old pgbench tables, if any exist
*/
@@ -7133,6 +7149,9 @@ main(int argc, char **argv)
}
}
+ /* enable threads to access the status of all clients */
+ client_states = state;
+
/* other CState initializations */
for (i = 0; i < nclients; i++)
{
@@ -7345,6 +7364,22 @@ threadRun(void *arg)
StatsData last,
aggs;
+ if (thread->tid == 0)
+ setup_cancel_handler(NULL);
+
+#if defined(ENABLE_THREAD_SAFETY) && !defined(WIN32)
+ if (thread->tid > 0 && false)
+ {
+ sigset_t sigint_sigset;
+ sigset_t osigset;
+ sigemptyset(&sigint_sigset);
+ sigaddset(&sigint_sigset, SIGINT);
+
+ /* Block SIGINT in all threads except one. */
+ pthread_sigmask(SIG_BLOCK, &sigint_sigset, &osigset);
+ }
+#endif
+
/* open log file if requested */
if (use_log)
{
@@ -7387,6 +7422,7 @@ threadRun(void *arg)
pg_fatal("could not create connection for client %d",
state[i].id);
}
+ state[i].cancel = PQgetCancel(state[i].con);
}
}
@@ -7414,6 +7450,17 @@ threadRun(void *arg)
pg_time_usec_t min_usec;
pg_time_usec_t now = 0; /* set this only if needed */
+ /*
+ * If pgbench is cancelled, sent cancel request to all connections
+ * and exit he benchmark.
+ */
+ if (CancelRequested)
+ {
+ if (thread->tid == 0)
+ cancel_all();
+ goto done;
+ }
+
/*
* identify which client sockets should be checked for input, and
* compute the nearest time (if any) at which we need to wake up.
@@ -7613,6 +7660,8 @@ finishCon(CState *st)
{
PQfinish(st->con);
st->con = NULL;
+ PQfreeCancel(st->cancel);
+ st->cancel = NULL;
}
}
diff --git a/src/bin/pgbench/t/003_pgbench_cancel.pl b/src/bin/pgbench/t/003_pgbench_cancel.pl
new file mode 100644
index 0000000000..f5619a687c
--- /dev/null
+++ b/src/bin/pgbench/t/003_pgbench_cancel.pl
@@ -0,0 +1,84 @@
+
+# Copyright (c) 2021-2023, PostgreSQL Global Development Group
+
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+use Time::HiRes qw(usleep);
+
+my $tempdir = PostgreSQL::Test::Utils::tempdir;
+
+my $node = PostgreSQL::Test::Cluster->new('main');
+$node->init;
+$node->start;
+
+# Test query canceling by sending SIGINT to a running pgbench
+#
+# There is, as of this writing, no documented way to get the PID of
+# the process from IPC::Run. As a workaround, we have pgbench print its
+# own PID (which is the parent of the shell launched by pgbench) to a
+# file.
+SKIP:
+{
+ skip "cancel test requires a Unix shell", 2 if $windows_os;
+
+ local %ENV = $node->_get_env();
+
+ my ($stdin, $stdout, $stderr, @file);
+
+ # Test whether shell supports $PPID. It's part of POSIX, but some
+ # pre-/non-POSIX shells don't support it (e.g., NetBSD).
+ $stdin = "\\! echo \$PPID";
+ IPC::Run::run([ 'psql', '-X', '-v', 'ON_ERROR_STOP=1' ],
+ '<', \$stdin, '>', \$stdout, '2>', \$stderr);
+ $stdout =~ /^\d+$/ or skip "shell apparently does not support \$PPID", 2;
+
+ @file = $node->_pgbench_make_files(
+ {
+ '003_pgbench_cancel' => qq{
+\\shell echo \$PPID >$tempdir/pgbench.pid
+select pg_sleep($PostgreSQL::Test::Utils::timeout_default);
+ }});
+
+ # Now start the real test
+ my $h = IPC::Run::start([ 'pgbench', '-c', '4', '-j', '4', '-T', "$PostgreSQL::Test::Utils::timeout_default", @file ],
+ \$stdin, \$stdout, \$stderr);
+
+ # Get the PID
+ my $count;
+ my $pgbench_pid;
+ until (
+ -s "$tempdir/pgbench.pid"
+ and ($pgbench_pid =
+ PostgreSQL::Test::Utils::slurp_file("$tempdir/pgbench.pid")) =~
+ /^\d+\n/s)
+ {
+ ($count++ < 100 * $PostgreSQL::Test::Utils::timeout_default)
+ or die "pid file did not appear";
+ usleep(10_000);
+ }
+
+ $node->poll_query_until('postgres',
+ q{SELECT (SELECT count(*) FROM pg_stat_activity WHERE query ~ '^select pg_sleep') = 4;}
+ ) or die "timed out";
+
+ # Send cancel request
+ kill 'INT', $pgbench_pid;
+
+ my $result = finish $h;
+
+ ok(!$result, 'query failed as expected');
+ like(
+ $stderr,
+ qr/Run was aborted; the above results are incomplete/,
+ 'pgbench was canceled');
+
+ is($node->safe_psql('postgres',
+ q{SELECT count(*) FROM pg_stat_activity WHERE query ~ '^select pg_sleep'}),
+ '0', 'all queries were canceled');
+}
+
+done_testing();