On 25/06/2024 05:25, Noah Misch wrote:
On Mon, Jun 24, 2024 at 11:03:09AM -0400, Tom Lane wrote:
Heikki Linnakangas <hlinn...@iki.fi> writes:
... I can't do that, because InjectionPointRun() requires a PGPROC
entry, because it uses an LWLock. That also makes it impossible to use
injection points in the postmaster. Any chance we could allow injection
points to be triggered without a PGPROC entry? Could we use a simple
spinlock instead?

That sounds fine to me.  If calling hash_search() with a spinlock feels too
awful, a list to linear-search could work.

With a fast path for the case that no injection points
are attached or something?

Even taking a spinlock in the postmaster is contrary to project
policy.  Maybe we could look the other way for debug-only code,
but it seems like a pretty horrible precedent.

If you're actually using an injection point in the postmaster, that would be
the least of concerns.  It is something of a concern for running an injection
point build while not attaching any injection point.  One solution could be a
GUC to control whether the postmaster participates in injection points.
Another could be to make the data structure readable with atomics only.

I came up with the attached. It replaces the shmem hash table with an array that's scanned linearly. On each slot in the array, there's a generation number that indicates whether the slot is in use, and allows detecting concurrent modifications without locks. The attach/detach operations still hold the LWLock, but InjectionPointRun() is now lock-free, so it can be used without a PGPROC entry.

It's now usable from postmaster too. However, it's theoretically possible that if shared memory is overwritten with garbage, the garbage looks like a valid injection point with a name that matches one of the injection points that postmaster looks at. That seems unlikely enough that I think we can accept the risk. To close that gap 100% I think a GUC is the only solution.

Note that until we actually add an injection point to a function that runs in the postmaster, there's no risk. If we're uneasy about that, we could add an assertion to InjectionPointRun() to prevent it from running in the postmaster, so that we don't cross that line inadvertently.

Thoughts?

--
Heikki Linnakangas
Neon (https://neon.tech)
From ca331988b67937b942db5c55770919306931d1ac Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakan...@iki.fi>
Date: Mon, 8 Jul 2024 16:09:46 +0300
Subject: [PATCH 1/1] Use atomics to avoid locking InjectionPointRun()

---
 src/backend/utils/misc/injection_point.c | 359 +++++++++++++++--------
 src/tools/pgindent/typedefs.list         |   1 +
 2 files changed, 243 insertions(+), 117 deletions(-)

diff --git a/src/backend/utils/misc/injection_point.c b/src/backend/utils/misc/injection_point.c
index 48f29e9b60..c5b2310532 100644
--- a/src/backend/utils/misc/injection_point.c
+++ b/src/backend/utils/misc/injection_point.c
@@ -21,7 +21,6 @@
 
 #include "fmgr.h"
 #include "miscadmin.h"
-#include "port/pg_bitutils.h"
 #include "storage/fd.h"
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
@@ -31,22 +30,35 @@
 
 #ifdef USE_INJECTION_POINTS
 
-/*
- * Hash table for storing injection points.
- *
- * InjectionPointHash is used to find an injection point by name.
- */
-static HTAB *InjectionPointHash;	/* find points from names */
-
 /* Field sizes */
 #define INJ_NAME_MAXLEN		64
 #define INJ_LIB_MAXLEN		128
 #define INJ_FUNC_MAXLEN		128
 #define INJ_PRIVATE_MAXLEN	1024
 
-/* Single injection point stored in InjectionPointHash */
+/* Single injection point stored in shared memory */
 typedef struct InjectionPointEntry
 {
+	/*
+	 * Because injection points need to be usable without LWLocks, we use a
+	 * generation counter on each entry to to allow safe, lock-free reading.
+	 *
+	 * To read an entry, first read the current 'generation' value.  If it's
+	 * even, then the slot is currently unused, and odd means it's in use.
+	 * When reading the other fields, beware that they may change while
+	 * reading them, if the entry is released and reused!  After reading the
+	 * other fields, read 'generation' again: if its value hasn't changed, you
+	 * can be certain that the other fields you read are valid.  Otherwise,
+	 * the slot was concurrently recycled, and you should ignore it.
+	 *
+	 * When adding an entry, you must store all the other fields first, and
+	 * then update the generation number, with an appropriate memory barrier
+	 * in between. In addition to that protocol, you must also hold
+	 * InjectionPointLock, to protect two backends modifying the array at the
+	 * same time.
+	 */
+	pg_atomic_uint64 generation;
+
 	char		name[INJ_NAME_MAXLEN];	/* hash key */
 	char		library[INJ_LIB_MAXLEN];	/* library */
 	char		function[INJ_FUNC_MAXLEN];	/* function */
@@ -58,8 +70,22 @@ typedef struct InjectionPointEntry
 	char		private_data[INJ_PRIVATE_MAXLEN];
 } InjectionPointEntry;
 
-#define INJECTION_POINT_HASH_INIT_SIZE	16
-#define INJECTION_POINT_HASH_MAX_SIZE	128
+#define MAX_INJECTION_POINTS	128
+
+/*
+ * Shared memory array of active injection points.
+ *
+ * 'max_inuse' is the highest index currently in use, plus one.  It's just an
+ * optimization to.avoid scanning through the whole entry, in the common case
+ * that there are no injection points, or only a few.
+ */
+typedef struct InjectionPointsCtl
+{
+	pg_atomic_uint32 max_inuse;
+	InjectionPointEntry entries[MAX_INJECTION_POINTS];
+} InjectionPointsCtl;
+
+static InjectionPointsCtl *ActiveInjectionPoints;
 
 /*
  * Backend local cache of injection callbacks already loaded, stored in
@@ -68,6 +94,8 @@ typedef struct InjectionPointEntry
 typedef struct InjectionPointCacheEntry
 {
 	char		name[INJ_NAME_MAXLEN];
+	int			slot_idx;
+	uint64		generation;
 	char		private_data[INJ_PRIVATE_MAXLEN];
 	InjectionPointCallback callback;
 } InjectionPointCacheEntry;
@@ -79,8 +107,10 @@ static HTAB *InjectionPointCache = NULL;
  *
  * Add an injection point to the local cache.
  */
-static void
+static InjectionPointCacheEntry *
 injection_point_cache_add(const char *name,
+						  int slot_idx,
+						  uint64 generation,
 						  InjectionPointCallback callback,
 						  const void *private_data)
 {
@@ -97,7 +127,7 @@ injection_point_cache_add(const char *name,
 		hash_ctl.hcxt = TopMemoryContext;
 
 		InjectionPointCache = hash_create("InjectionPoint cache hash",
-										  INJECTION_POINT_HASH_MAX_SIZE,
+										  MAX_INJECTION_POINTS,
 										  &hash_ctl,
 										  HASH_ELEM | HASH_STRINGS | HASH_CONTEXT);
 	}
@@ -107,9 +137,12 @@ injection_point_cache_add(const char *name,
 
 	Assert(!found);
 	strlcpy(entry->name, name, sizeof(entry->name));
+	entry->slot_idx = slot_idx;
+	entry->generation = generation;
 	entry->callback = callback;
-	if (private_data != NULL)
-		memcpy(entry->private_data, private_data, INJ_PRIVATE_MAXLEN);
+	memcpy(entry->private_data, private_data, INJ_PRIVATE_MAXLEN);
+
+	return entry;
 }
 
 /*
@@ -122,11 +155,10 @@ injection_point_cache_add(const char *name,
 static void
 injection_point_cache_remove(const char *name)
 {
-	/* leave if no cache */
-	if (InjectionPointCache == NULL)
-		return;
+	bool		found PG_USED_FOR_ASSERTS_ONLY;
 
-	(void) hash_search(InjectionPointCache, name, HASH_REMOVE, NULL);
+	(void) hash_search(InjectionPointCache, name, HASH_REMOVE, &found);
+	Assert(found);
 }
 
 /*
@@ -134,29 +166,32 @@ injection_point_cache_remove(const char *name)
  *
  * Load an injection point into the local cache.
  */
-static void
-injection_point_cache_load(InjectionPointEntry *entry_by_name)
+static InjectionPointCacheEntry *
+injection_point_cache_load(InjectionPointEntry *entry, int slot_idx, uint64 generation)
 {
 	char		path[MAXPGPATH];
 	void	   *injection_callback_local;
 
 	snprintf(path, MAXPGPATH, "%s/%s%s", pkglib_path,
-			 entry_by_name->library, DLSUFFIX);
+			 entry->library, DLSUFFIX);
 
 	if (!pg_file_exists(path))
 		elog(ERROR, "could not find library \"%s\" for injection point \"%s\"",
-			 path, entry_by_name->name);
+			 path, entry->name);
 
 	injection_callback_local = (void *)
-		load_external_function(path, entry_by_name->function, false, NULL);
+		load_external_function(path, entry->function, false, NULL);
 
 	if (injection_callback_local == NULL)
 		elog(ERROR, "could not find function \"%s\" in library \"%s\" for injection point \"%s\"",
-			 entry_by_name->function, path, entry_by_name->name);
-
-	/* add it to the local cache when found */
-	injection_point_cache_add(entry_by_name->name, injection_callback_local,
-							  entry_by_name->private_data);
+			 entry->function, path, entry->name);
+
+	/* add it to the local cache */
+	return injection_point_cache_add(entry->name,
+									 slot_idx,
+									 generation,
+									 injection_callback_local,
+									 entry->private_data);
 }
 
 /*
@@ -193,8 +228,7 @@ InjectionPointShmemSize(void)
 #ifdef USE_INJECTION_POINTS
 	Size		sz = 0;
 
-	sz = add_size(sz, hash_estimate_size(INJECTION_POINT_HASH_MAX_SIZE,
-										 sizeof(InjectionPointEntry)));
+	sz = add_size(sz, sizeof(InjectionPointsCtl));
 	return sz;
 #else
 	return 0;
@@ -208,16 +242,20 @@ void
 InjectionPointShmemInit(void)
 {
 #ifdef USE_INJECTION_POINTS
-	HASHCTL		info;
-
-	/* key is a NULL-terminated string */
-	info.keysize = sizeof(char[INJ_NAME_MAXLEN]);
-	info.entrysize = sizeof(InjectionPointEntry);
-	InjectionPointHash = ShmemInitHash("InjectionPoint hash",
-									   INJECTION_POINT_HASH_INIT_SIZE,
-									   INJECTION_POINT_HASH_MAX_SIZE,
-									   &info,
-									   HASH_ELEM | HASH_FIXED_SIZE | HASH_STRINGS);
+	bool		found;
+
+	ActiveInjectionPoints = ShmemInitStruct("InjectionPoint hash",
+											sizeof(InjectionPointsCtl),
+											&found);
+	if (!IsUnderPostmaster)
+	{
+		Assert(!found);
+		pg_atomic_init_u32(&ActiveInjectionPoints->max_inuse, 0);
+		for (int i = 0; i < MAX_INJECTION_POINTS; i++)
+			pg_atomic_init_u64(&ActiveInjectionPoints->entries[i].generation, 0);
+	}
+	else
+		Assert(found);
 #endif
 }
 
@@ -232,8 +270,10 @@ InjectionPointAttach(const char *name,
 					 int private_data_size)
 {
 #ifdef USE_INJECTION_POINTS
-	InjectionPointEntry *entry_by_name;
-	bool		found;
+	InjectionPointEntry *entry;
+	uint64		generation;
+	uint32		max_inuse;
+	int			free_idx;
 
 	if (strlen(name) >= INJ_NAME_MAXLEN)
 		elog(ERROR, "injection point name %s too long (maximum of %u)",
@@ -253,21 +293,51 @@ InjectionPointAttach(const char *name,
 	 * exist.  For testing purposes this should be fine.
 	 */
 	LWLockAcquire(InjectionPointLock, LW_EXCLUSIVE);
-	entry_by_name = (InjectionPointEntry *)
-		hash_search(InjectionPointHash, name,
-					HASH_ENTER, &found);
-	if (found)
-		elog(ERROR, "injection point \"%s\" already defined", name);
+	max_inuse = pg_atomic_read_u32(&ActiveInjectionPoints->max_inuse);
+	free_idx = -1;
+
+	for (int idx = 0; idx < max_inuse; idx++)
+	{
+		entry = &ActiveInjectionPoints->entries[idx];
+		generation = pg_atomic_read_u64(&entry->generation);
+		if (generation % 2 == 0)
+		{
+			/*
+			 * Found a free slot where we can add the new entry, but keep
+			 * going so that we will find out if the entry already exists.
+			 */
+			if (free_idx == -1)
+				free_idx = idx;
+		}
+
+		if (strcmp(entry->name, name) == 0)
+			elog(ERROR, "injection point \"%s\" already defined", name);
+	}
+	if (free_idx == -1)
+	{
+		if (max_inuse == MAX_INJECTION_POINTS)
+			elog(ERROR, "too many injection points");
+		free_idx = max_inuse;
+	}
+	entry = &ActiveInjectionPoints->entries[free_idx];
+	generation = pg_atomic_read_u64(&entry->generation);
+	Assert(generation % 2 == 0);
 
 	/* Save the entry */
-	strlcpy(entry_by_name->name, name, sizeof(entry_by_name->name));
-	entry_by_name->name[INJ_NAME_MAXLEN - 1] = '\0';
-	strlcpy(entry_by_name->library, library, sizeof(entry_by_name->library));
-	entry_by_name->library[INJ_LIB_MAXLEN - 1] = '\0';
-	strlcpy(entry_by_name->function, function, sizeof(entry_by_name->function));
-	entry_by_name->function[INJ_FUNC_MAXLEN - 1] = '\0';
+	strlcpy(entry->name, name, sizeof(entry->name));
+	entry->name[INJ_NAME_MAXLEN - 1] = '\0';
+	strlcpy(entry->library, library, sizeof(entry->library));
+	entry->library[INJ_LIB_MAXLEN - 1] = '\0';
+	strlcpy(entry->function, function, sizeof(entry->function));
+	entry->function[INJ_FUNC_MAXLEN - 1] = '\0';
 	if (private_data != NULL)
-		memcpy(entry_by_name->private_data, private_data, private_data_size);
+		memcpy(entry->private_data, private_data, private_data_size);
+
+	pg_write_barrier();
+	pg_atomic_write_u64(&entry->generation, generation + 1);
+
+	if (free_idx + 1 > max_inuse)
+		pg_atomic_write_u32(&ActiveInjectionPoints->max_inuse, free_idx + 1);
 
 	LWLockRelease(InjectionPointLock);
 
@@ -285,10 +355,45 @@ bool
 InjectionPointDetach(const char *name)
 {
 #ifdef USE_INJECTION_POINTS
-	bool		found;
+	bool		found = false;
+	int			idx;
+	int			max_inuse;
 
 	LWLockAcquire(InjectionPointLock, LW_EXCLUSIVE);
-	hash_search(InjectionPointHash, name, HASH_REMOVE, &found);
+
+	/* Find it in the shmem array, and mark the entry as unused */
+	max_inuse = (int) pg_atomic_read_u32(&ActiveInjectionPoints->max_inuse);
+	for (idx = max_inuse - 1; idx >= 0; --idx)
+	{
+		InjectionPointEntry *entry = &ActiveInjectionPoints->entries[idx];
+		uint64		generation;
+
+		generation = pg_atomic_read_u64(&entry->generation);
+		if (generation % 2 == 0)
+			continue;
+		if (strcmp(entry->name, name) == 0)
+		{
+			Assert(!found);
+			found = true;
+			pg_atomic_write_u64(&entry->generation, generation + 1);
+			break;
+		}
+	}
+
+	/* If we just removed the highest-numbered entry, update 'max_inuse' */
+	if (found && idx == max_inuse - 1)
+	{
+		for (; idx >= 0; --idx)
+		{
+			InjectionPointEntry *entry = &ActiveInjectionPoints->entries[idx];
+			uint64		generation;
+
+			generation = pg_atomic_read_u64(&entry->generation);
+			if (generation % 2 != 0)
+				break;
+		}
+		pg_atomic_write_u32(&ActiveInjectionPoints->max_inuse, idx + 1);
+	}
 	LWLockRelease(InjectionPointLock);
 
 	if (!found)
@@ -302,46 +407,97 @@ InjectionPointDetach(const char *name)
 }
 
 /*
- * Load an injection point into the local cache.
+ * Common workhorse of InjectionPointRun() and InjectionPointLoad()
  *
- * This is useful to be able to load an injection point before running it,
- * especially if the injection point is called in a code path where memory
- * allocations cannot happen, like critical sections.
+ * Checks if an injection point exists in shared memory, and update
+ * the local cache entry accordingly.
  */
-void
-InjectionPointLoad(const char *name)
+static InjectionPointCacheEntry *
+InjectionPointCacheRefresh(const char *name)
 {
-#ifdef USE_INJECTION_POINTS
-	InjectionPointEntry *entry_by_name;
-	bool		found;
+	uint32		max_inuse;
+	int			namelen;
+	InjectionPointEntry local_copy;
+	InjectionPointCacheEntry *cached;
 
-	LWLockAcquire(InjectionPointLock, LW_SHARED);
-	entry_by_name = (InjectionPointEntry *)
-		hash_search(InjectionPointHash, name,
-					HASH_FIND, &found);
+	/*
+	 * First read the number of in-use slots.  More entries can be added or
+	 * existing ones can be removed while we're reading them.  If the entry
+	 * we're looking for is concurrently added or remoed, we might or might
+	 * not see it.  That's OK.
+	 */
+	max_inuse = pg_atomic_read_u32(&ActiveInjectionPoints->max_inuse);
+	if (max_inuse == 0)
+	{
+		if (InjectionPointCache)
+		{
+			hash_destroy(InjectionPointCache);
+			InjectionPointCache = NULL;
+		}
+		return false;
+	}
 
 	/*
-	 * If not found, do nothing and remove it from the local cache if it
-	 * existed there.
+	 * If we have this entry in the local cache, check if the cached entry is
+	 * still valid.
 	 */
-	if (!found)
+	cached = injection_point_cache_get(name);
+	if (cached)
 	{
+		int			idx = cached->slot_idx;
+		InjectionPointEntry *entry = &ActiveInjectionPoints->entries[idx];
+
+		if (pg_atomic_read_u64(&entry->generation) == cached->generation)
+		{
+			/* still good */
+			return cached;
+		}
 		injection_point_cache_remove(name);
-		LWLockRelease(InjectionPointLock);
-		return;
+		cached = NULL;
 	}
 
-	/* Check first the local cache, and leave if this entry exists. */
-	if (injection_point_cache_get(name) != NULL)
+	namelen = strlen(name);
+	for (int idx = 0; idx < max_inuse; idx++)
 	{
-		LWLockRelease(InjectionPointLock);
-		return;
-	}
+		InjectionPointEntry *entry = &ActiveInjectionPoints->entries[idx];
+		uint64		generation = pg_atomic_read_u64(&entry->generation);
 
-	/* Nothing?  Then load it and leave */
-	injection_point_cache_load(entry_by_name);
+		if (generation % 2 == 0)
+			continue;
 
-	LWLockRelease(InjectionPointLock);
+		pg_read_barrier();
+		if (memcmp(entry->name, name, namelen + 1) != 0)
+			continue;
+
+		/*
+		 * The entry can change at any time, if the injection point is
+		 * concurrently detached.  Copy it to local memory, and re-check the
+		 * generation.  If the generation hasn't changed, we know our local
+		 * copy is coherent.
+		 */
+		memcpy(&local_copy, entry, sizeof(InjectionPointEntry));
+
+		pg_read_barrier();
+		if (pg_atomic_read_u64(&entry->generation) != generation)
+			continue;			/* was detached concurrently */
+
+		return injection_point_cache_load(&local_copy, idx, generation);
+	}
+	return NULL;
+}
+
+/*
+ * Load an injection point into the local cache.
+ *
+ * This is useful to be able to load an injection point before running it,
+ * especially if the injection point is called in a code path where memory
+ * allocations cannot happen, like critical sections.
+ */
+void
+InjectionPointLoad(const char *name)
+{
+#ifdef USE_INJECTION_POINTS
+	InjectionPointCacheRefresh(name);
 #else
 	elog(ERROR, "Injection points are not supported by this build");
 #endif
@@ -357,42 +513,11 @@ void
 InjectionPointRun(const char *name)
 {
 #ifdef USE_INJECTION_POINTS
-	InjectionPointEntry *entry_by_name;
-	bool		found;
 	InjectionPointCacheEntry *cache_entry;
 
-	LWLockAcquire(InjectionPointLock, LW_SHARED);
-	entry_by_name = (InjectionPointEntry *)
-		hash_search(InjectionPointHash, name,
-					HASH_FIND, &found);
-
-	/*
-	 * If not found, do nothing and remove it from the local cache if it
-	 * existed there.
-	 */
-	if (!found)
-	{
-		injection_point_cache_remove(name);
-		LWLockRelease(InjectionPointLock);
-		return;
-	}
-
-	/*
-	 * Check if the callback exists in the local cache, to avoid unnecessary
-	 * external loads.
-	 */
-	if (injection_point_cache_get(name) == NULL)
-	{
-		/* not found in local cache, so load and register it */
-		injection_point_cache_load(entry_by_name);
-	}
-
-	/* Now loaded, so get it. */
-	cache_entry = injection_point_cache_get(name);
-
-	LWLockRelease(InjectionPointLock);
-
-	cache_entry->callback(name, cache_entry->private_data);
+	cache_entry = InjectionPointCacheRefresh(name);
+	if (cache_entry)
+		cache_entry->callback(name, cache_entry->private_data);
 #else
 	elog(ERROR, "Injection points are not supported by this build");
 #endif
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 9320e4d808..c7bab382aa 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1238,6 +1238,7 @@ InjectionPointCallback
 InjectionPointCondition
 InjectionPointConditionType
 InjectionPointEntry
+InjectionPointsCtl
 InjectionPointSharedState
 InlineCodeBlock
 InsertStmt
-- 
2.39.2

Reply via email to