On Wed, Feb 21, 2024 at 07:08:03AM +0900, Michael Paquier wrote: > Well, both you and Andrey are asking for it now, so let's do it. The > implementation is simple: > - Store in InjectionPointSharedState an array of wait_counts and an > array of names. There is only one condition variable. > - When a point wants to wait, it takes the spinlock and looks within > the array of names until it finds a free slot, adds its name into the > array to reserve a wait counter at the same position, releases the > spinlock. Then it loops on the condition variable for an update of > the counter it has reserved. It is possible to make something more > efficient, but at a small size it would not really matter. > - The wakeup takes a point name in argument, acquires the spinlock, > and checks if it can find the point into the array, pinpoints the > location of the counter to update and updates it. Then it broadcasts > the change. > - The wait loop checks its counter, leaves its loop, cancels the > sleep, takes the spinlock to unregister from the array, and leaves. > > I would just hardcode the number of points that can wait, say 5 of > them tracked in shmem? Does that look like what you are looking at?
I was looking at that, and it proves to work OK, so you can do stuff like waits and wakeups for multiple processes in a controlled manner. The attached patch authorizes up to 32 waiters. I have switched things so as the information reported in pg_stat_activity is the name of the injection point itself. Comments and ideas are welcome. -- Michael
From 75302cba302b83ce2a6d6eaf30b163f473b87276 Mon Sep 17 00:00:00 2001 From: Michael Paquier <[email protected]> Date: Wed, 21 Feb 2024 16:36:25 +0900 Subject: [PATCH v2 1/2] injection_points: Add routines to wait and wake processes This commit is made of two parts: - A new callback that can be attached to a process to make it wait on a condition variable. The condition checked is registered in shared memory by the module injection_points. - A new SQL function to update the shared state and broadcast the update using a condition variable. The shared state used by the module is registered using the DSM registry, and is optional. --- .../injection_points--1.0.sql | 10 ++ .../injection_points/injection_points.c | 151 ++++++++++++++++++ src/tools/pgindent/typedefs.list | 1 + 3 files changed, 162 insertions(+) diff --git a/src/test/modules/injection_points/injection_points--1.0.sql b/src/test/modules/injection_points/injection_points--1.0.sql index 5944c41716..eed0310cf6 100644 --- a/src/test/modules/injection_points/injection_points--1.0.sql +++ b/src/test/modules/injection_points/injection_points--1.0.sql @@ -24,6 +24,16 @@ RETURNS void AS 'MODULE_PATHNAME', 'injection_points_run' LANGUAGE C STRICT PARALLEL UNSAFE; +-- +-- injection_points_wakeup() +-- +-- Wakes a condition variable waited on in an injection point. +-- +CREATE FUNCTION injection_points_wakeup(IN point_name TEXT) +RETURNS void +AS 'MODULE_PATHNAME', 'injection_points_wakeup' +LANGUAGE C STRICT PARALLEL UNSAFE; + -- -- injection_points_detach() -- diff --git a/src/test/modules/injection_points/injection_points.c b/src/test/modules/injection_points/injection_points.c index e843e6594f..052b20f9c8 100644 --- a/src/test/modules/injection_points/injection_points.c +++ b/src/test/modules/injection_points/injection_points.c @@ -18,18 +18,72 @@ #include "postgres.h" #include "fmgr.h" +#include "storage/condition_variable.h" #include "storage/lwlock.h" #include "storage/shmem.h" +#include "storage/dsm_registry.h" #include "utils/builtins.h" #include "utils/injection_point.h" #include "utils/wait_event.h" PG_MODULE_MAGIC; +/* Maximum number of wait usable in injection points at once */ +#define INJ_MAX_WAIT 32 +#define INJ_NAME_MAXLEN 64 + +/* Shared state information for injection points. */ +typedef struct InjectionPointSharedState +{ + /* protects accesses to wait_counts */ + slock_t lock; + + /* Counters advancing when injection_points_wakeup() is called */ + int wait_counts[INJ_MAX_WAIT]; + + /* Names of injection points attached to wait counters */ + char name[INJ_MAX_WAIT][INJ_NAME_MAXLEN]; + + /* + * Condition variable used for waits and wakeups, checking upon the set of + * wait_counts when waiting. + */ + ConditionVariable wait_point; +} InjectionPointSharedState; + +/* Pointer to shared-memory state. */ +static InjectionPointSharedState *inj_state = NULL; + extern PGDLLEXPORT void injection_error(const char *name); extern PGDLLEXPORT void injection_notice(const char *name); +extern PGDLLEXPORT void injection_wait(const char *name); +static void +injection_point_init_state(void *ptr) +{ + InjectionPointSharedState *state = (InjectionPointSharedState *) ptr; + + SpinLockInit(&state->lock); + memset(state->wait_counts, 0, sizeof(state->wait_counts)); + memset(state->name, 0, sizeof(state->name)); + ConditionVariableInit(&state->wait_point); +} + +static void +injection_init_shmem(void) +{ + bool found; + + if (inj_state != NULL) + return; + + inj_state = GetNamedDSMSegment("injection_points", + sizeof(InjectionPointSharedState), + injection_point_init_state, + &found); +} + /* Set of callbacks available to be attached to an injection point. */ void injection_error(const char *name) @@ -43,6 +97,65 @@ injection_notice(const char *name) elog(NOTICE, "notice triggered for injection point %s", name); } +/* Wait on a condition variable, awaken by injection_points_wakeup() */ +void +injection_wait(const char *name) +{ + int old_wait_counts = -1; + int index = -1; + uint32 injection_wait_event = 0; + + if (inj_state == NULL) + injection_init_shmem(); + + /* + * This custom wait event name is not released, but we don't care much for + * testing as this will be short-lived. + */ + injection_wait_event = WaitEventExtensionNew(name); + + /* + * Find a free slot to wait for, and register this injection point's name. + */ + SpinLockAcquire(&inj_state->lock); + for (int i = 0; i < INJ_MAX_WAIT; i++) + { + if (inj_state->name[i][0] == '\0') + { + index = i; + strlcpy(inj_state->name[i], name, INJ_NAME_MAXLEN); + old_wait_counts = inj_state->wait_counts[i]; + break; + } + } + SpinLockRelease(&inj_state->lock); + + if (index < 0) + elog(ERROR, "could not find free slot for wait of injection point %s ", + name); + + /* And sleep.. */ + ConditionVariablePrepareToSleep(&inj_state->wait_point); + for (;;) + { + int new_wait_counts; + + SpinLockAcquire(&inj_state->lock); + new_wait_counts = inj_state->wait_counts[index]; + SpinLockRelease(&inj_state->lock); + + if (old_wait_counts != new_wait_counts) + break; + ConditionVariableSleep(&inj_state->wait_point, injection_wait_event); + } + ConditionVariableCancelSleep(); + + /* Remove us from the waiting list */ + SpinLockAcquire(&inj_state->lock); + inj_state->name[index][0] = '\0'; + SpinLockRelease(&inj_state->lock); +} + /* * SQL function for creating an injection point. */ @@ -58,6 +171,8 @@ injection_points_attach(PG_FUNCTION_ARGS) function = "injection_error"; else if (strcmp(action, "notice") == 0) function = "injection_notice"; + else if (strcmp(action, "wait") == 0) + function = "injection_wait"; else elog(ERROR, "incorrect action \"%s\" for injection point creation", action); @@ -80,6 +195,42 @@ injection_points_run(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } +/* + * SQL function for waking a condition variable. + */ +PG_FUNCTION_INFO_V1(injection_points_wakeup); +Datum +injection_points_wakeup(PG_FUNCTION_ARGS) +{ + char *name = text_to_cstring(PG_GETARG_TEXT_PP(0)); + int index = -1; + + if (inj_state == NULL) + injection_init_shmem(); + + /* First bump the wait counter for the injection point to wake */ + SpinLockAcquire(&inj_state->lock); + for (int i = 0; i < INJ_MAX_WAIT; i++) + { + if (strcmp(name, inj_state->name[i]) == 0) + { + index = i; + break; + } + } + if (index < 0) + { + SpinLockRelease(&inj_state->lock); + elog(ERROR, "could not find injection point %s to wake", name); + } + inj_state->wait_counts[index]++; + SpinLockRelease(&inj_state->lock); + + /* And broadcast the change for the waiters */ + ConditionVariableBroadcast(&inj_state->wait_point); + PG_RETURN_VOID(); +} + /* * SQL function for dropping an injection point. */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index d808aad8b0..d7eca00502 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1208,6 +1208,7 @@ InitializeDSMForeignScan_function InitializeWorkerForeignScan_function InjectionPointCacheEntry InjectionPointEntry +InjectionPointSharedState InlineCodeBlock InsertStmt Instrumentation -- 2.43.0
From 79cc3e309c15bec19fec8c3cb2ac2e48bd42d7a2 Mon Sep 17 00:00:00 2001 From: Michael Paquier <[email protected]> Date: Wed, 21 Feb 2024 16:37:17 +0900 Subject: [PATCH v2 2/2] Add regression test for restart points during promotion This test fails when 7863ee4def65 is reverted, checking that a node is able to properly restart following a crash when a restart point was finishing across a promotion. This is an old bug that had no coverage, and injection points make that cheap to achieve. --- src/backend/access/transam/xlog.c | 7 + src/test/recovery/Makefile | 7 +- src/test/recovery/meson.build | 4 + .../t/041_invalid_checkpoint_after_promote.pl | 176 ++++++++++++++++++ 4 files changed, 193 insertions(+), 1 deletion(-) create mode 100644 src/test/recovery/t/041_invalid_checkpoint_after_promote.pl diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 50c347a679..50b045ff08 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -100,6 +100,7 @@ #include "storage/sync.h" #include "utils/guc_hooks.h" #include "utils/guc_tables.h" +#include "utils/injection_point.h" #include "utils/memutils.h" #include "utils/ps_status.h" #include "utils/relmapper.h" @@ -7536,6 +7537,12 @@ CreateRestartPoint(int flags) CheckPointGuts(lastCheckPoint.redo, flags); + /* + * This location needs to be after CheckPointGuts() to ensure that some + * work has already happened during this checkpoint. + */ + INJECTION_POINT("CreateRestartPoint"); + /* * Remember the prior checkpoint's redo ptr for * UpdateCheckPointDistanceEstimate() diff --git a/src/test/recovery/Makefile b/src/test/recovery/Makefile index 17ee353735..f57baba5e8 100644 --- a/src/test/recovery/Makefile +++ b/src/test/recovery/Makefile @@ -9,12 +9,17 @@ # #------------------------------------------------------------------------- -EXTRA_INSTALL=contrib/pg_prewarm contrib/pg_stat_statements contrib/test_decoding +EXTRA_INSTALL=contrib/pg_prewarm \ + contrib/pg_stat_statements \ + contrib/test_decoding \ + src/test/modules/injection_points subdir = src/test/recovery top_builddir = ../../.. include $(top_builddir)/src/Makefile.global +export enable_injection_points enable_injection_points + # required for 017_shm.pl and 027_stream_regress.pl REGRESS_SHLIB=$(abs_top_builddir)/src/test/regress/regress$(DLSUFFIX) export REGRESS_SHLIB diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build index bf087ac2a9..e4e0e2b4cc 100644 --- a/src/test/recovery/meson.build +++ b/src/test/recovery/meson.build @@ -6,6 +6,9 @@ tests += { 'bd': meson.current_build_dir(), 'tap': { 'test_kwargs': {'priority': 40}, # recovery tests are slow, start early + 'env': { + 'enable_injection_points': get_option('injection_points') ? 'yes' : 'no', + }, 'tests': [ 't/001_stream_rep.pl', 't/002_archiving.pl', @@ -46,6 +49,7 @@ tests += { 't/038_save_logical_slots_shutdown.pl', 't/039_end_of_wal.pl', 't/040_standby_failover_slots_sync.pl', + 't/041_invalid_checkpoint_after_promote.pl', ], }, } diff --git a/src/test/recovery/t/041_invalid_checkpoint_after_promote.pl b/src/test/recovery/t/041_invalid_checkpoint_after_promote.pl new file mode 100644 index 0000000000..e91f360d12 --- /dev/null +++ b/src/test/recovery/t/041_invalid_checkpoint_after_promote.pl @@ -0,0 +1,176 @@ + +# Copyright (c) 2024, PostgreSQL Global Development Group + +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Time::HiRes qw(usleep); +use Test::More; + +################################################## +# Test race condition when a restart point is running during a promotion, +# checking that WAL segments are correctly removed in the restart point +# while the promotion finishes. +# +# This test relies on an injection point that causes the checkpointer to +# wait in the middle of a restart point on a standby. The checkpointer +# is awaken to finish its restart point only once the promotion of the +# standby is completed, and the node should be able to restart properly. +################################################## + +if ($ENV{enable_injection_points} ne 'yes') +{ + plan skip_all => 'Injection points not supported by this build'; +} + +# Initialize primary node +my $node_primary = PostgreSQL::Test::Cluster->new('master'); +$node_primary->init(allows_streaming => 1); +$node_primary->append_conf( + 'postgresql.conf', q[ +checkpoint_timeout = 30s +log_checkpoints = on +restart_after_crash = on +]); +$node_primary->start; + +my $backup_name = 'my_backup'; +$node_primary->backup($backup_name); + +# Setup a standby +my $node_standby = PostgreSQL::Test::Cluster->new('standby1'); +$node_standby->init_from_backup($node_primary, $backup_name, + has_streaming => 1); +$node_standby->start; + +# Dummy table for the upcoming tests. +$node_primary->safe_psql('postgres', 'checkpoint'); +$node_primary->safe_psql('postgres', 'CREATE TABLE prim_tab (a int);'); + +# Register an injection point on the standby so as the follow-up +# restart point will wait on it. +$node_primary->safe_psql('postgres', 'CREATE EXTENSION injection_points;'); +# Wait until the extension has been created on the standby +$node_primary->wait_for_replay_catchup($node_standby); + +# Note that from this point the checkpointer will wait in the middle of +# a restart point on the standby. +$node_standby->safe_psql('postgres', + "SELECT injection_points_attach('CreateRestartPoint', 'wait');"); + +# Execute a restart point on the standby, that we will now be waiting on. +# This needs to be in the background. +my $logstart = -s $node_standby->logfile; +my $psql_session = + $node_standby->background_psql('postgres', on_error_stop => 0); +$psql_session->query_until( + qr/starting_checkpoint/, q( + \echo starting_checkpoint + CHECKPOINT; +)); + +# Switch one WAL segment to make the previous restart point remove the +# segment once the restart point completes. +$node_primary->safe_psql('postgres', 'INSERT INTO prim_tab VALUES (1);'); +$node_primary->safe_psql('postgres', 'SELECT pg_switch_wal();'); +$node_primary->wait_for_replay_catchup($node_standby); + +# Wait until the checkpointer is in the middle of the restart point +# processing, relying on the custom wait event generated in the +# wait callback used in the injection point previously attached. +ok( $node_standby->poll_query_until( + 'postgres', + qq[SELECT count(*) FROM pg_stat_activity + WHERE backend_type = 'checkpointer' AND wait_event = 'CreateRestartPoint' ;], + '1'), + 'checkpointer is waiting in restart point' +) or die "Timed out while waiting for checkpointer to run restart point"; + +# Check the logs that the restart point has started on standby. This is +# optional, but let's be sure. +my $log = slurp_file($node_standby->logfile, $logstart); +my $checkpoint_start = 0; +if ($log =~ m/restartpoint starting: immediate wait/) +{ + $checkpoint_start = 1; +} +is($checkpoint_start, 1, 'restartpoint has started'); + +# Trigger promotion during the restart point. +$node_primary->stop; +$node_standby->promote; + +# Update the start position before waking up the checkpointer! +$logstart = -s $node_standby->logfile; + +# Now wake up the checkpointer. +$node_standby->safe_psql('postgres', + "SELECT injection_points_wakeup('CreateRestartPoint');"); + +# Wait until the previous restart point completes on the newly-promoted +# standby, checking the logs for that. +my $checkpoint_complete = 0; +foreach my $i (0 .. 10 * $PostgreSQL::Test::Utils::timeout_default) +{ + my $log = slurp_file($node_standby->logfile, $logstart); + if ($log =~ m/restartpoint complete/) + { + $checkpoint_complete = 1; + last; + } + usleep(100_000); +} +is($checkpoint_complete, 1, 'restart point has completed'); + +# Kill with SIGKILL, forcing all the backends to restart. +my $psql_timeout = IPC::Run::timer(3600); +my ($killme_stdin, $killme_stdout, $killme_stderr) = ('', '', ''); +my $killme = IPC::Run::start( + [ + 'psql', '-XAtq', '-v', 'ON_ERROR_STOP=1', '-f', '-', '-d', + $node_standby->connstr('postgres') + ], + '<', + \$killme_stdin, + '>', + \$killme_stdout, + '2>', + \$killme_stderr, + $psql_timeout); +$killme_stdin .= q[ +SELECT pg_backend_pid(); +]; +$killme->pump until $killme_stdout =~ /[[:digit:]]+[\r\n]$/; +my $pid = $killme_stdout; +chomp($pid); +$killme_stdout = ''; +$killme_stderr = ''; + +my $ret = PostgreSQL::Test::Utils::system_log('pg_ctl', 'kill', 'KILL', $pid); +is($ret, 0, 'killed process with KILL'); + +# Wait until the server restarts, finish consuming output. +$killme_stdin .= q[ +SELECT 1; +]; +ok( pump_until( + $killme, + $psql_timeout, + \$killme_stderr, + qr/server closed the connection unexpectedly|connection to server was lost|could not send data to server/m + ), + "psql query died successfully after SIGKILL"); +$killme->finish; + +# Wait till server finishes restarting +$node_standby->poll_query_until('postgres', undef, ''); + +# After recovery, the server should be able to start. +my $stdout; +my $stderr; +($ret, $stdout, $stderr) = $node_standby->psql('postgres', 'select 1'); +is($ret, 0, "psql connect success"); +is($stdout, 1, "psql select 1"); + +done_testing(); -- 2.43.0
signature.asc
Description: PGP signature
