On 25/06/2024 05:25, Noah Misch wrote:
On Mon, Jun 24, 2024 at 11:03:09AM -0400, Tom Lane wrote:
Heikki Linnakangas <hlinn...@iki.fi> writes:
... I can't do that, because InjectionPointRun() requires a PGPROC
entry, because it uses an LWLock. That also makes it impossible to use
injection points in the postmaster. Any chance we could allow injection
points to be triggered without a PGPROC entry? Could we use a simple
spinlock instead?
That sounds fine to me. If calling hash_search() with a spinlock feels too
awful, a list to linear-search could work.
With a fast path for the case that no injection points
are attached or something?
Even taking a spinlock in the postmaster is contrary to project
policy. Maybe we could look the other way for debug-only code,
but it seems like a pretty horrible precedent.
If you're actually using an injection point in the postmaster, that would be
the least of concerns. It is something of a concern for running an injection
point build while not attaching any injection point. One solution could be a
GUC to control whether the postmaster participates in injection points.
Another could be to make the data structure readable with atomics only.
I came up with the attached. It replaces the shmem hash table with an
array that's scanned linearly. On each slot in the array, there's a
generation number that indicates whether the slot is in use, and allows
detecting concurrent modifications without locks. The attach/detach
operations still hold the LWLock, but InjectionPointRun() is now
lock-free, so it can be used without a PGPROC entry.
It's now usable from postmaster too. However, it's theoretically
possible that if shared memory is overwritten with garbage, the garbage
looks like a valid injection point with a name that matches one of the
injection points that postmaster looks at. That seems unlikely enough
that I think we can accept the risk. To close that gap 100% I think a
GUC is the only solution.
Note that until we actually add an injection point to a function that
runs in the postmaster, there's no risk. If we're uneasy about that, we
could add an assertion to InjectionPointRun() to prevent it from running
in the postmaster, so that we don't cross that line inadvertently.
Thoughts?
--
Heikki Linnakangas
Neon (https://neon.tech)
From ca331988b67937b942db5c55770919306931d1ac Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakan...@iki.fi>
Date: Mon, 8 Jul 2024 16:09:46 +0300
Subject: [PATCH 1/1] Use atomics to avoid locking InjectionPointRun()
---
src/backend/utils/misc/injection_point.c | 359 +++++++++++++++--------
src/tools/pgindent/typedefs.list | 1 +
2 files changed, 243 insertions(+), 117 deletions(-)
diff --git a/src/backend/utils/misc/injection_point.c b/src/backend/utils/misc/injection_point.c
index 48f29e9b60..c5b2310532 100644
--- a/src/backend/utils/misc/injection_point.c
+++ b/src/backend/utils/misc/injection_point.c
@@ -21,7 +21,6 @@
#include "fmgr.h"
#include "miscadmin.h"
-#include "port/pg_bitutils.h"
#include "storage/fd.h"
#include "storage/lwlock.h"
#include "storage/shmem.h"
@@ -31,22 +30,35 @@
#ifdef USE_INJECTION_POINTS
-/*
- * Hash table for storing injection points.
- *
- * InjectionPointHash is used to find an injection point by name.
- */
-static HTAB *InjectionPointHash; /* find points from names */
-
/* Field sizes */
#define INJ_NAME_MAXLEN 64
#define INJ_LIB_MAXLEN 128
#define INJ_FUNC_MAXLEN 128
#define INJ_PRIVATE_MAXLEN 1024
-/* Single injection point stored in InjectionPointHash */
+/* Single injection point stored in shared memory */
typedef struct InjectionPointEntry
{
+ /*
+ * Because injection points need to be usable without LWLocks, we use a
+ * generation counter on each entry to to allow safe, lock-free reading.
+ *
+ * To read an entry, first read the current 'generation' value. If it's
+ * even, then the slot is currently unused, and odd means it's in use.
+ * When reading the other fields, beware that they may change while
+ * reading them, if the entry is released and reused! After reading the
+ * other fields, read 'generation' again: if its value hasn't changed, you
+ * can be certain that the other fields you read are valid. Otherwise,
+ * the slot was concurrently recycled, and you should ignore it.
+ *
+ * When adding an entry, you must store all the other fields first, and
+ * then update the generation number, with an appropriate memory barrier
+ * in between. In addition to that protocol, you must also hold
+ * InjectionPointLock, to protect two backends modifying the array at the
+ * same time.
+ */
+ pg_atomic_uint64 generation;
+
char name[INJ_NAME_MAXLEN]; /* hash key */
char library[INJ_LIB_MAXLEN]; /* library */
char function[INJ_FUNC_MAXLEN]; /* function */
@@ -58,8 +70,22 @@ typedef struct InjectionPointEntry
char private_data[INJ_PRIVATE_MAXLEN];
} InjectionPointEntry;
-#define INJECTION_POINT_HASH_INIT_SIZE 16
-#define INJECTION_POINT_HASH_MAX_SIZE 128
+#define MAX_INJECTION_POINTS 128
+
+/*
+ * Shared memory array of active injection points.
+ *
+ * 'max_inuse' is the highest index currently in use, plus one. It's just an
+ * optimization to.avoid scanning through the whole entry, in the common case
+ * that there are no injection points, or only a few.
+ */
+typedef struct InjectionPointsCtl
+{
+ pg_atomic_uint32 max_inuse;
+ InjectionPointEntry entries[MAX_INJECTION_POINTS];
+} InjectionPointsCtl;
+
+static InjectionPointsCtl *ActiveInjectionPoints;
/*
* Backend local cache of injection callbacks already loaded, stored in
@@ -68,6 +94,8 @@ typedef struct InjectionPointEntry
typedef struct InjectionPointCacheEntry
{
char name[INJ_NAME_MAXLEN];
+ int slot_idx;
+ uint64 generation;
char private_data[INJ_PRIVATE_MAXLEN];
InjectionPointCallback callback;
} InjectionPointCacheEntry;
@@ -79,8 +107,10 @@ static HTAB *InjectionPointCache = NULL;
*
* Add an injection point to the local cache.
*/
-static void
+static InjectionPointCacheEntry *
injection_point_cache_add(const char *name,
+ int slot_idx,
+ uint64 generation,
InjectionPointCallback callback,
const void *private_data)
{
@@ -97,7 +127,7 @@ injection_point_cache_add(const char *name,
hash_ctl.hcxt = TopMemoryContext;
InjectionPointCache = hash_create("InjectionPoint cache hash",
- INJECTION_POINT_HASH_MAX_SIZE,
+ MAX_INJECTION_POINTS,
&hash_ctl,
HASH_ELEM | HASH_STRINGS | HASH_CONTEXT);
}
@@ -107,9 +137,12 @@ injection_point_cache_add(const char *name,
Assert(!found);
strlcpy(entry->name, name, sizeof(entry->name));
+ entry->slot_idx = slot_idx;
+ entry->generation = generation;
entry->callback = callback;
- if (private_data != NULL)
- memcpy(entry->private_data, private_data, INJ_PRIVATE_MAXLEN);
+ memcpy(entry->private_data, private_data, INJ_PRIVATE_MAXLEN);
+
+ return entry;
}
/*
@@ -122,11 +155,10 @@ injection_point_cache_add(const char *name,
static void
injection_point_cache_remove(const char *name)
{
- /* leave if no cache */
- if (InjectionPointCache == NULL)
- return;
+ bool found PG_USED_FOR_ASSERTS_ONLY;
- (void) hash_search(InjectionPointCache, name, HASH_REMOVE, NULL);
+ (void) hash_search(InjectionPointCache, name, HASH_REMOVE, &found);
+ Assert(found);
}
/*
@@ -134,29 +166,32 @@ injection_point_cache_remove(const char *name)
*
* Load an injection point into the local cache.
*/
-static void
-injection_point_cache_load(InjectionPointEntry *entry_by_name)
+static InjectionPointCacheEntry *
+injection_point_cache_load(InjectionPointEntry *entry, int slot_idx, uint64 generation)
{
char path[MAXPGPATH];
void *injection_callback_local;
snprintf(path, MAXPGPATH, "%s/%s%s", pkglib_path,
- entry_by_name->library, DLSUFFIX);
+ entry->library, DLSUFFIX);
if (!pg_file_exists(path))
elog(ERROR, "could not find library \"%s\" for injection point \"%s\"",
- path, entry_by_name->name);
+ path, entry->name);
injection_callback_local = (void *)
- load_external_function(path, entry_by_name->function, false, NULL);
+ load_external_function(path, entry->function, false, NULL);
if (injection_callback_local == NULL)
elog(ERROR, "could not find function \"%s\" in library \"%s\" for injection point \"%s\"",
- entry_by_name->function, path, entry_by_name->name);
-
- /* add it to the local cache when found */
- injection_point_cache_add(entry_by_name->name, injection_callback_local,
- entry_by_name->private_data);
+ entry->function, path, entry->name);
+
+ /* add it to the local cache */
+ return injection_point_cache_add(entry->name,
+ slot_idx,
+ generation,
+ injection_callback_local,
+ entry->private_data);
}
/*
@@ -193,8 +228,7 @@ InjectionPointShmemSize(void)
#ifdef USE_INJECTION_POINTS
Size sz = 0;
- sz = add_size(sz, hash_estimate_size(INJECTION_POINT_HASH_MAX_SIZE,
- sizeof(InjectionPointEntry)));
+ sz = add_size(sz, sizeof(InjectionPointsCtl));
return sz;
#else
return 0;
@@ -208,16 +242,20 @@ void
InjectionPointShmemInit(void)
{
#ifdef USE_INJECTION_POINTS
- HASHCTL info;
-
- /* key is a NULL-terminated string */
- info.keysize = sizeof(char[INJ_NAME_MAXLEN]);
- info.entrysize = sizeof(InjectionPointEntry);
- InjectionPointHash = ShmemInitHash("InjectionPoint hash",
- INJECTION_POINT_HASH_INIT_SIZE,
- INJECTION_POINT_HASH_MAX_SIZE,
- &info,
- HASH_ELEM | HASH_FIXED_SIZE | HASH_STRINGS);
+ bool found;
+
+ ActiveInjectionPoints = ShmemInitStruct("InjectionPoint hash",
+ sizeof(InjectionPointsCtl),
+ &found);
+ if (!IsUnderPostmaster)
+ {
+ Assert(!found);
+ pg_atomic_init_u32(&ActiveInjectionPoints->max_inuse, 0);
+ for (int i = 0; i < MAX_INJECTION_POINTS; i++)
+ pg_atomic_init_u64(&ActiveInjectionPoints->entries[i].generation, 0);
+ }
+ else
+ Assert(found);
#endif
}
@@ -232,8 +270,10 @@ InjectionPointAttach(const char *name,
int private_data_size)
{
#ifdef USE_INJECTION_POINTS
- InjectionPointEntry *entry_by_name;
- bool found;
+ InjectionPointEntry *entry;
+ uint64 generation;
+ uint32 max_inuse;
+ int free_idx;
if (strlen(name) >= INJ_NAME_MAXLEN)
elog(ERROR, "injection point name %s too long (maximum of %u)",
@@ -253,21 +293,51 @@ InjectionPointAttach(const char *name,
* exist. For testing purposes this should be fine.
*/
LWLockAcquire(InjectionPointLock, LW_EXCLUSIVE);
- entry_by_name = (InjectionPointEntry *)
- hash_search(InjectionPointHash, name,
- HASH_ENTER, &found);
- if (found)
- elog(ERROR, "injection point \"%s\" already defined", name);
+ max_inuse = pg_atomic_read_u32(&ActiveInjectionPoints->max_inuse);
+ free_idx = -1;
+
+ for (int idx = 0; idx < max_inuse; idx++)
+ {
+ entry = &ActiveInjectionPoints->entries[idx];
+ generation = pg_atomic_read_u64(&entry->generation);
+ if (generation % 2 == 0)
+ {
+ /*
+ * Found a free slot where we can add the new entry, but keep
+ * going so that we will find out if the entry already exists.
+ */
+ if (free_idx == -1)
+ free_idx = idx;
+ }
+
+ if (strcmp(entry->name, name) == 0)
+ elog(ERROR, "injection point \"%s\" already defined", name);
+ }
+ if (free_idx == -1)
+ {
+ if (max_inuse == MAX_INJECTION_POINTS)
+ elog(ERROR, "too many injection points");
+ free_idx = max_inuse;
+ }
+ entry = &ActiveInjectionPoints->entries[free_idx];
+ generation = pg_atomic_read_u64(&entry->generation);
+ Assert(generation % 2 == 0);
/* Save the entry */
- strlcpy(entry_by_name->name, name, sizeof(entry_by_name->name));
- entry_by_name->name[INJ_NAME_MAXLEN - 1] = '\0';
- strlcpy(entry_by_name->library, library, sizeof(entry_by_name->library));
- entry_by_name->library[INJ_LIB_MAXLEN - 1] = '\0';
- strlcpy(entry_by_name->function, function, sizeof(entry_by_name->function));
- entry_by_name->function[INJ_FUNC_MAXLEN - 1] = '\0';
+ strlcpy(entry->name, name, sizeof(entry->name));
+ entry->name[INJ_NAME_MAXLEN - 1] = '\0';
+ strlcpy(entry->library, library, sizeof(entry->library));
+ entry->library[INJ_LIB_MAXLEN - 1] = '\0';
+ strlcpy(entry->function, function, sizeof(entry->function));
+ entry->function[INJ_FUNC_MAXLEN - 1] = '\0';
if (private_data != NULL)
- memcpy(entry_by_name->private_data, private_data, private_data_size);
+ memcpy(entry->private_data, private_data, private_data_size);
+
+ pg_write_barrier();
+ pg_atomic_write_u64(&entry->generation, generation + 1);
+
+ if (free_idx + 1 > max_inuse)
+ pg_atomic_write_u32(&ActiveInjectionPoints->max_inuse, free_idx + 1);
LWLockRelease(InjectionPointLock);
@@ -285,10 +355,45 @@ bool
InjectionPointDetach(const char *name)
{
#ifdef USE_INJECTION_POINTS
- bool found;
+ bool found = false;
+ int idx;
+ int max_inuse;
LWLockAcquire(InjectionPointLock, LW_EXCLUSIVE);
- hash_search(InjectionPointHash, name, HASH_REMOVE, &found);
+
+ /* Find it in the shmem array, and mark the entry as unused */
+ max_inuse = (int) pg_atomic_read_u32(&ActiveInjectionPoints->max_inuse);
+ for (idx = max_inuse - 1; idx >= 0; --idx)
+ {
+ InjectionPointEntry *entry = &ActiveInjectionPoints->entries[idx];
+ uint64 generation;
+
+ generation = pg_atomic_read_u64(&entry->generation);
+ if (generation % 2 == 0)
+ continue;
+ if (strcmp(entry->name, name) == 0)
+ {
+ Assert(!found);
+ found = true;
+ pg_atomic_write_u64(&entry->generation, generation + 1);
+ break;
+ }
+ }
+
+ /* If we just removed the highest-numbered entry, update 'max_inuse' */
+ if (found && idx == max_inuse - 1)
+ {
+ for (; idx >= 0; --idx)
+ {
+ InjectionPointEntry *entry = &ActiveInjectionPoints->entries[idx];
+ uint64 generation;
+
+ generation = pg_atomic_read_u64(&entry->generation);
+ if (generation % 2 != 0)
+ break;
+ }
+ pg_atomic_write_u32(&ActiveInjectionPoints->max_inuse, idx + 1);
+ }
LWLockRelease(InjectionPointLock);
if (!found)
@@ -302,46 +407,97 @@ InjectionPointDetach(const char *name)
}
/*
- * Load an injection point into the local cache.
+ * Common workhorse of InjectionPointRun() and InjectionPointLoad()
*
- * This is useful to be able to load an injection point before running it,
- * especially if the injection point is called in a code path where memory
- * allocations cannot happen, like critical sections.
+ * Checks if an injection point exists in shared memory, and update
+ * the local cache entry accordingly.
*/
-void
-InjectionPointLoad(const char *name)
+static InjectionPointCacheEntry *
+InjectionPointCacheRefresh(const char *name)
{
-#ifdef USE_INJECTION_POINTS
- InjectionPointEntry *entry_by_name;
- bool found;
+ uint32 max_inuse;
+ int namelen;
+ InjectionPointEntry local_copy;
+ InjectionPointCacheEntry *cached;
- LWLockAcquire(InjectionPointLock, LW_SHARED);
- entry_by_name = (InjectionPointEntry *)
- hash_search(InjectionPointHash, name,
- HASH_FIND, &found);
+ /*
+ * First read the number of in-use slots. More entries can be added or
+ * existing ones can be removed while we're reading them. If the entry
+ * we're looking for is concurrently added or remoed, we might or might
+ * not see it. That's OK.
+ */
+ max_inuse = pg_atomic_read_u32(&ActiveInjectionPoints->max_inuse);
+ if (max_inuse == 0)
+ {
+ if (InjectionPointCache)
+ {
+ hash_destroy(InjectionPointCache);
+ InjectionPointCache = NULL;
+ }
+ return false;
+ }
/*
- * If not found, do nothing and remove it from the local cache if it
- * existed there.
+ * If we have this entry in the local cache, check if the cached entry is
+ * still valid.
*/
- if (!found)
+ cached = injection_point_cache_get(name);
+ if (cached)
{
+ int idx = cached->slot_idx;
+ InjectionPointEntry *entry = &ActiveInjectionPoints->entries[idx];
+
+ if (pg_atomic_read_u64(&entry->generation) == cached->generation)
+ {
+ /* still good */
+ return cached;
+ }
injection_point_cache_remove(name);
- LWLockRelease(InjectionPointLock);
- return;
+ cached = NULL;
}
- /* Check first the local cache, and leave if this entry exists. */
- if (injection_point_cache_get(name) != NULL)
+ namelen = strlen(name);
+ for (int idx = 0; idx < max_inuse; idx++)
{
- LWLockRelease(InjectionPointLock);
- return;
- }
+ InjectionPointEntry *entry = &ActiveInjectionPoints->entries[idx];
+ uint64 generation = pg_atomic_read_u64(&entry->generation);
- /* Nothing? Then load it and leave */
- injection_point_cache_load(entry_by_name);
+ if (generation % 2 == 0)
+ continue;
- LWLockRelease(InjectionPointLock);
+ pg_read_barrier();
+ if (memcmp(entry->name, name, namelen + 1) != 0)
+ continue;
+
+ /*
+ * The entry can change at any time, if the injection point is
+ * concurrently detached. Copy it to local memory, and re-check the
+ * generation. If the generation hasn't changed, we know our local
+ * copy is coherent.
+ */
+ memcpy(&local_copy, entry, sizeof(InjectionPointEntry));
+
+ pg_read_barrier();
+ if (pg_atomic_read_u64(&entry->generation) != generation)
+ continue; /* was detached concurrently */
+
+ return injection_point_cache_load(&local_copy, idx, generation);
+ }
+ return NULL;
+}
+
+/*
+ * Load an injection point into the local cache.
+ *
+ * This is useful to be able to load an injection point before running it,
+ * especially if the injection point is called in a code path where memory
+ * allocations cannot happen, like critical sections.
+ */
+void
+InjectionPointLoad(const char *name)
+{
+#ifdef USE_INJECTION_POINTS
+ InjectionPointCacheRefresh(name);
#else
elog(ERROR, "Injection points are not supported by this build");
#endif
@@ -357,42 +513,11 @@ void
InjectionPointRun(const char *name)
{
#ifdef USE_INJECTION_POINTS
- InjectionPointEntry *entry_by_name;
- bool found;
InjectionPointCacheEntry *cache_entry;
- LWLockAcquire(InjectionPointLock, LW_SHARED);
- entry_by_name = (InjectionPointEntry *)
- hash_search(InjectionPointHash, name,
- HASH_FIND, &found);
-
- /*
- * If not found, do nothing and remove it from the local cache if it
- * existed there.
- */
- if (!found)
- {
- injection_point_cache_remove(name);
- LWLockRelease(InjectionPointLock);
- return;
- }
-
- /*
- * Check if the callback exists in the local cache, to avoid unnecessary
- * external loads.
- */
- if (injection_point_cache_get(name) == NULL)
- {
- /* not found in local cache, so load and register it */
- injection_point_cache_load(entry_by_name);
- }
-
- /* Now loaded, so get it. */
- cache_entry = injection_point_cache_get(name);
-
- LWLockRelease(InjectionPointLock);
-
- cache_entry->callback(name, cache_entry->private_data);
+ cache_entry = InjectionPointCacheRefresh(name);
+ if (cache_entry)
+ cache_entry->callback(name, cache_entry->private_data);
#else
elog(ERROR, "Injection points are not supported by this build");
#endif
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 9320e4d808..c7bab382aa 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1238,6 +1238,7 @@ InjectionPointCallback
InjectionPointCondition
InjectionPointConditionType
InjectionPointEntry
+InjectionPointsCtl
InjectionPointSharedState
InlineCodeBlock
InsertStmt
--
2.39.2