So this is what I came up with on plane. Generalized the loading
functionality into load_library_function which that can load either
known postgres functions or call load_external_function.

I am not quite sure if fmgr.c is best place to put it, but I didn't want
to include stuff from executor in bgworker.c. I was thinking about
putting it to dfmgr.c (as that's where load_external_function) but again
seemed weird to including executor there.

As with previous patch, 9.6 will need quite different patch as we need
to keep compatibility there, but since I am traveling I think it's
better to share what I have so far.

-- 
  Petr Jelinek                  http://www.2ndQuadrant.com/
  PostgreSQL Development, 24x7 Support, Training & Services
diff --git a/src/backend/access/transam/README.parallel b/src/backend/access/transam/README.parallel
index db9ac3d..b360887 100644
--- a/src/backend/access/transam/README.parallel
+++ b/src/backend/access/transam/README.parallel
@@ -198,7 +198,7 @@ pattern looks like this:
 
 	EnterParallelMode();		/* prohibit unsafe state changes */
 
-	pcxt = CreateParallelContext(entrypoint, nworkers);
+	pcxt = CreateParallelContext("library_name", "function_name", nworkers);
 
 	/* Allow space for application-specific data here. */
 	shm_toc_estimate_chunk(&pcxt->estimator, size);
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index b3d3853..326d4f9 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -61,7 +61,7 @@
 #define PARALLEL_KEY_TRANSACTION_SNAPSHOT	UINT64CONST(0xFFFFFFFFFFFF0006)
 #define PARALLEL_KEY_ACTIVE_SNAPSHOT		UINT64CONST(0xFFFFFFFFFFFF0007)
 #define PARALLEL_KEY_TRANSACTION_STATE		UINT64CONST(0xFFFFFFFFFFFF0008)
-#define PARALLEL_KEY_EXTENSION_TRAMPOLINE	UINT64CONST(0xFFFFFFFFFFFF0009)
+#define PARALLEL_KEY_ENTRYPOINT				UINT64CONST(0xFFFFFFFFFFFF0009)
 
 /* Fixed-size parallel state. */
 typedef struct FixedParallelState
@@ -77,9 +77,6 @@ typedef struct FixedParallelState
 	pid_t		parallel_master_pid;
 	BackendId	parallel_master_backend_id;
 
-	/* Entrypoint for parallel workers. */
-	parallel_worker_main_type entrypoint;
-
 	/* Mutex protects remaining fields. */
 	slock_t		mutex;
 
@@ -109,7 +106,6 @@ static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
 
 /* Private functions. */
 static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg);
-static void ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc);
 static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
 
 
@@ -119,7 +115,8 @@ static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
  * destroyed before exiting the current subtransaction.
  */
 ParallelContext *
-CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers)
+CreateParallelContext(const char *library_name, const char *function_name,
+					  int nworkers)
 {
 	MemoryContext oldcontext;
 	ParallelContext *pcxt;
@@ -152,7 +149,8 @@ CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers)
 	pcxt = palloc0(sizeof(ParallelContext));
 	pcxt->subid = GetCurrentSubTransactionId();
 	pcxt->nworkers = nworkers;
-	pcxt->entrypoint = entrypoint;
+	pcxt->library_name = pstrdup(library_name);
+	pcxt->function_name = pstrdup(function_name);
 	pcxt->error_context_stack = error_context_stack;
 	shm_toc_initialize_estimator(&pcxt->estimator);
 	dlist_push_head(&pcxt_list, &pcxt->node);
@@ -164,33 +162,6 @@ CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers)
 }
 
 /*
- * Establish a new parallel context that calls a function provided by an
- * extension.  This works around the fact that the library might get mapped
- * at a different address in each backend.
- */
-ParallelContext *
-CreateParallelContextForExternalFunction(char *library_name,
-										 char *function_name,
-										 int nworkers)
-{
-	MemoryContext oldcontext;
-	ParallelContext *pcxt;
-
-	/* We might be running in a very short-lived memory context. */
-	oldcontext = MemoryContextSwitchTo(TopTransactionContext);
-
-	/* Create the context. */
-	pcxt = CreateParallelContext(ParallelExtensionTrampoline, nworkers);
-	pcxt->library_name = pstrdup(library_name);
-	pcxt->function_name = pstrdup(function_name);
-
-	/* Restore previous memory context. */
-	MemoryContextSwitchTo(oldcontext);
-
-	return pcxt;
-}
-
-/*
  * Establish the dynamic shared memory segment for a parallel context and
  * copy state and other bookkeeping information that will be needed by
  * parallel workers into it.
@@ -249,15 +220,10 @@ InitializeParallelDSM(ParallelContext *pcxt)
 										pcxt->nworkers));
 		shm_toc_estimate_keys(&pcxt->estimator, 1);
 
-		/* Estimate how much we'll need for extension entrypoint info. */
-		if (pcxt->library_name != NULL)
-		{
-			Assert(pcxt->entrypoint == ParallelExtensionTrampoline);
-			Assert(pcxt->function_name != NULL);
-			shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name)
-								   + strlen(pcxt->function_name) + 2);
-			shm_toc_estimate_keys(&pcxt->estimator, 1);
-		}
+		/* Estimate how much we'll need for the entrypoint info. */
+		shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name)
+							   + strlen(pcxt->function_name) + 2);
+		shm_toc_estimate_keys(&pcxt->estimator, 1);
 	}
 
 	/*
@@ -297,7 +263,6 @@ InitializeParallelDSM(ParallelContext *pcxt)
 	fps->parallel_master_pgproc = MyProc;
 	fps->parallel_master_pid = MyProcPid;
 	fps->parallel_master_backend_id = MyBackendId;
-	fps->entrypoint = pcxt->entrypoint;
 	SpinLockInit(&fps->mutex);
 	fps->last_xlog_end = 0;
 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
@@ -312,6 +277,8 @@ InitializeParallelDSM(ParallelContext *pcxt)
 		char	   *asnapspace;
 		char	   *tstatespace;
 		char	   *error_queue_space;
+		char	   *entrypointstate;
+		Size		lnamelen;
 
 		/* Serialize shared libraries we have loaded. */
 		libraryspace = shm_toc_allocate(pcxt->toc, library_len);
@@ -368,19 +335,18 @@ InitializeParallelDSM(ParallelContext *pcxt)
 		}
 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space);
 
-		/* Serialize extension entrypoint information. */
-		if (pcxt->library_name != NULL)
-		{
-			Size		lnamelen = strlen(pcxt->library_name);
-			char	   *extensionstate;
-
-			extensionstate = shm_toc_allocate(pcxt->toc, lnamelen
-										  + strlen(pcxt->function_name) + 2);
-			strcpy(extensionstate, pcxt->library_name);
-			strcpy(extensionstate + lnamelen + 1, pcxt->function_name);
-			shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXTENSION_TRAMPOLINE,
-						   extensionstate);
-		}
+		/*
+		 * Serialize extension entrypoint information. It's unsafe to pass
+		 * function pointers across parallel processes as the function pointer
+		 * may be different in new process in EXEC_BACKEND builds so we
+		 * always pass library and function name.
+		 */
+		lnamelen = strlen(pcxt->library_name);
+		entrypointstate = shm_toc_allocate(pcxt->toc, lnamelen
+										   + strlen(pcxt->function_name) + 2);
+		strcpy(entrypointstate, pcxt->library_name);
+		strcpy(entrypointstate + lnamelen + 1, pcxt->function_name);
+		shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENTRYPOINT, entrypointstate);
 	}
 
 	/* Restore previous memory context. */
@@ -946,7 +912,11 @@ ParallelWorkerMain(Datum main_arg)
 	char	   *tsnapspace;
 	char	   *asnapspace;
 	char	   *tstatespace;
+	char	   *entrypointstate;
+	char	   *library_name;
+	char	   *function_name;
 	StringInfoData msgbuf;
+	parallel_worker_main_type entrypt;
 
 	/* Set flag to indicate that we're initializing a parallel worker. */
 	InitializingParallelWorker = true;
@@ -1077,6 +1047,15 @@ ParallelWorkerMain(Datum main_arg)
 	Assert(asnapspace != NULL);
 	PushActiveSnapshot(RestoreSnapshot(asnapspace));
 
+	/* Load the entry point. */
+	entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT);
+	Assert(entrypointstate != NULL);
+	library_name = entrypointstate;
+	function_name = entrypointstate + strlen(library_name) + 1;
+
+	entrypt = (parallel_worker_main_type)
+		load_library_function(library_name, function_name);
+
 	/*
 	 * We've changed which tuples we can see, and must therefore invalidate
 	 * system caches.
@@ -1102,11 +1081,8 @@ ParallelWorkerMain(Datum main_arg)
 
 	/*
 	 * Time to do the real work: invoke the caller-supplied code.
-	 *
-	 * If you get a crash at this line, see the comments for
-	 * ParallelExtensionTrampoline.
 	 */
-	fps->entrypoint(seg, toc);
+	entrypt(seg, toc);
 
 	/* Must exit parallel mode to pop active snapshot. */
 	ExitParallelMode();
@@ -1122,33 +1098,6 @@ ParallelWorkerMain(Datum main_arg)
 }
 
 /*
- * It's unsafe for the entrypoint invoked by ParallelWorkerMain to be a
- * function living in a dynamically loaded module, because the module might
- * not be loaded in every process, or might be loaded but not at the same
- * address.  To work around that problem, CreateParallelContextForExtension()
- * arranges to call this function rather than calling the extension-provided
- * function directly; and this function then looks up the real entrypoint and
- * calls it.
- */
-static void
-ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc)
-{
-	char	   *extensionstate;
-	char	   *library_name;
-	char	   *function_name;
-	parallel_worker_main_type entrypt;
-
-	extensionstate = shm_toc_lookup(toc, PARALLEL_KEY_EXTENSION_TRAMPOLINE);
-	Assert(extensionstate != NULL);
-	library_name = extensionstate;
-	function_name = extensionstate + strlen(library_name) + 1;
-
-	entrypt = (parallel_worker_main_type)
-		load_external_function(library_name, function_name, true, NULL);
-	entrypt(seg, toc);
-}
-
-/*
  * Update shared memory with the ending location of the last WAL record we
  * wrote, if it's greater than the value already stored there.
  */
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index 469a32c..49ea1e0 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -112,8 +112,7 @@ static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
 static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
 							 SharedExecutorInstrumentation *instrumentation);
 
-/* Helper functions that run in the parallel worker. */
-static void ParallelQueryMain(dsm_segment *seg, shm_toc *toc);
+/* Helper function that run in the parallel worker. */
 static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
 
 /*
@@ -393,7 +392,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
 	pstmt_data = ExecSerializePlan(planstate->plan, estate);
 
 	/* Create a parallel context. */
-	pcxt = CreateParallelContext(ParallelQueryMain, nworkers);
+	pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
 	pei->pcxt = pcxt;
 
 	/*
@@ -814,7 +813,7 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
  * to do this are also stored in the dsm_segment and can be accessed through
  * the shm_toc.
  */
-static void
+void
 ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 {
 	BufferUsage *buffer_usage;
diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c
index 0823317..0a9ca98 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -15,14 +15,11 @@
 #include <unistd.h>
 
 #include "libpq/pqsignal.h"
-#include "access/parallel.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "port/atomics.h"
 #include "postmaster/bgworker_internals.h"
 #include "postmaster/postmaster.h"
-#include "replication/logicallauncher.h"
-#include "replication/logicalworker.h"
 #include "storage/dsm.h"
 #include "storage/ipc.h"
 #include "storage/latch.h"
@@ -111,27 +108,6 @@ struct BackgroundWorkerHandle
 static BackgroundWorkerArray *BackgroundWorkerData;
 
 /*
- * List of internal background workers. These are used for mapping the
- * function name to actual function when building with EXEC_BACKEND and also
- * to allow these to be loaded outside of shared_preload_libraries.
- */
-typedef struct InternalBGWorkerMain
-{
-	char			   *bgw_function_name;
-	bgworker_main_type	bgw_main;
-} InternalBGWorkerMain;
-
-static const InternalBGWorkerMain InternalBGWorkers[] = {
-	{"ParallelWorkerMain", ParallelWorkerMain},
-	{"ApplyLauncherMain", ApplyLauncherMain},
-	{"ApplyWorkerMain", ApplyWorkerMain},
-	/* Dummy entry marking end of the array. */
-	{NULL, NULL}
-};
-
-static bgworker_main_type GetInternalBgWorkerMain(BackgroundWorker *worker);
-
-/*
  * Calculate shared memory needed.
  */
 Size
@@ -776,18 +752,14 @@ StartBackgroundWorker(void)
 	}
 
 	/*
-	 * For internal workers set the entry point to known function address.
-	 * Otherwise use the entry point specified by library name (which will
-	 * be loaded, if necessary) and a function name (which will be looked up
-	 * in the named library).
+	 * Load the function. For internal workers the function will be loaded
+	 * according to known function map (see fmgr.c).  Otherwise use the entry
+	 * point specified by library name (which will be loaded, if necessary)
+	 * and a function name (which will be looked up in the named library).
 	 */
-	entrypt = GetInternalBgWorkerMain(worker);
-
-	if (entrypt == NULL)
-		entrypt = (bgworker_main_type)
-			load_external_function(worker->bgw_library_name,
-								   worker->bgw_function_name,
-								   true, NULL);
+	entrypt = (bgworker_main_type)
+		load_library_function(worker->bgw_library_name,
+							  worker->bgw_function_name);
 
 	/*
 	 * Note that in normal processes, we would call InitPostgres here.  For a
@@ -806,10 +778,11 @@ StartBackgroundWorker(void)
 }
 
 /*
- * Register a new background worker while processing shared_preload_libraries.
+ * Register a new static background worker.
  *
- * This can only be called in the _PG_init function of a module library
- * that's loaded by shared_preload_libraries; otherwise it has no effect.
+ * This can only be called directly from postmaster or in the _PG_init
+ * function of a module library that's loaded by shared_preload_libraries;
+ * otherwise it will have no effect.
  */
 void
 RegisterBackgroundWorker(BackgroundWorker *worker)
@@ -822,7 +795,7 @@ RegisterBackgroundWorker(BackgroundWorker *worker)
 		 (errmsg("registering background worker \"%s\"", worker->bgw_name)));
 
 	if (!process_shared_preload_libraries_in_progress &&
-		GetInternalBgWorkerMain(worker) == NULL)
+		strcmp(worker->bgw_library_name, "postgres") != 0)
 	{
 		if (!IsUnderPostmaster)
 			ereport(LOG,
@@ -1152,28 +1125,3 @@ TerminateBackgroundWorker(BackgroundWorkerHandle *handle)
 	if (signal_postmaster)
 		SendPostmasterSignal(PMSIGNAL_BACKGROUND_WORKER_CHANGE);
 }
-
-/*
- * Search the known internal worker array and return its main function
- * pointer if found.
- *
- * Returns NULL if not known internal worker.
- */
-static bgworker_main_type
-GetInternalBgWorkerMain(BackgroundWorker *worker)
-{
-	int i;
-
-	/* Internal workers always have to use postgres as library name. */
-	if (strncmp(worker->bgw_library_name, "postgres", BGW_MAXLEN) != 0)
-		return NULL;
-
-	for (i = 0; InternalBGWorkers[i].bgw_function_name; i++)
-	{
-		if (strncmp(InternalBGWorkers[i].bgw_function_name,
-					worker->bgw_function_name, BGW_MAXLEN) == 0)
-			return InternalBGWorkers[i].bgw_main;
-	}
-
-	return NULL;
-}
diff --git a/src/backend/utils/fmgr/fmgr.c b/src/backend/utils/fmgr/fmgr.c
index 68d2110..c23db6a 100644
--- a/src/backend/utils/fmgr/fmgr.c
+++ b/src/backend/utils/fmgr/fmgr.c
@@ -15,14 +15,18 @@
 
 #include "postgres.h"
 
+#include "access/parallel.h"
 #include "access/tuptoaster.h"
 #include "catalog/pg_language.h"
 #include "catalog/pg_proc.h"
+#include "executor/execParallel.h"
 #include "executor/functions.h"
 #include "lib/stringinfo.h"
 #include "miscadmin.h"
 #include "nodes/nodeFuncs.h"
 #include "pgstat.h"
+#include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/fmgrtab.h"
@@ -61,6 +65,25 @@ static void record_C_func(HeapTuple procedureTuple,
 			  PGFunction user_fn, const Pg_finfo_record *inforec);
 static Datum fmgr_security_definer(PG_FUNCTION_ARGS);
 
+/*
+ * These are used for mapping the function name to actual function when
+ * building with EXEC_BACKEND and also to allow bgworkers using these outside
+ * of shared_preload_libraries.
+ */
+typedef struct KnownFunctionPointer
+{
+	char	   *fn_name;
+	void	   *fn_addr;
+} KnownFunctionPointer;
+
+static const KnownFunctionPointer known_functions_map[] = {
+	{"ParallelWorkerMain", ParallelWorkerMain},
+	{"ParallelQueryMain", ParallelQueryMain},
+	{"ApplyLauncherMain", ApplyLauncherMain},
+	{"ApplyWorkerMain", ApplyWorkerMain},
+	/* Dummy entry marking end of the array. */
+	{NULL, NULL}
+};
 
 /*
  * Lookup routines for builtin-function table.  We can search by either Oid
@@ -691,6 +714,36 @@ fmgr_security_definer(PG_FUNCTION_ARGS)
 	return result;
 }
 
+/*
+ * This is similar to load_external_function (which is may call) but will
+ * also try to match funct name to internal known functions map when asked to
+ * load postgres function.
+ */
+void *
+load_library_function(char *libraryname, char *funcname)
+{
+	/*
+	 * If the function is to be loaded from postgres itself, search the known
+	 * functions map.
+	 */
+	if (strcmp(libraryname, "postgres") == 0)
+	{
+		int i;
+
+		for (i = 0; known_functions_map[i].fn_name; i++)
+		{
+			if (strcmp(known_functions_map[i].fn_name,
+						funcname) == 0)
+				return known_functions_map[i].fn_addr;
+		}
+
+		/* We can only reach this by programming error. */
+		elog(ERROR, "internal function \"%s\" not found", funcname);
+	}
+
+	/* Otherwise load from external library. */
+	return load_external_function(libraryname, funcname, true, NULL);
+}
 
 /*-------------------------------------------------------------------------
  *		Support routines for callers of fmgr-compatible functions
diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h
index 5065a38..b869727 100644
--- a/src/include/access/parallel.h
+++ b/src/include/access/parallel.h
@@ -52,8 +52,7 @@ extern bool InitializingParallelWorker;
 
 #define		IsParallelWorker()		(ParallelWorkerNumber >= 0)
 
-extern ParallelContext *CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers);
-extern ParallelContext *CreateParallelContextForExternalFunction(char *library_name, char *function_name, int nworkers);
+extern ParallelContext *CreateParallelContext(const char *library_name, const char *function_name, int nworkers);
 extern void InitializeParallelDSM(ParallelContext *pcxt);
 extern void ReinitializeParallelDSM(ParallelContext *pcxt);
 extern void LaunchParallelWorkers(ParallelContext *pcxt);
diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h
index 8bc4270..0b7ca59 100644
--- a/src/include/executor/execParallel.h
+++ b/src/include/executor/execParallel.h
@@ -38,4 +38,6 @@ extern void ExecParallelFinish(ParallelExecutorInfo *pei);
 extern void ExecParallelCleanup(ParallelExecutorInfo *pei);
 extern void ExecParallelReinitialize(ParallelExecutorInfo *pei);
 
+extern void ParallelQueryMain(dsm_segment *seg, shm_toc *toc);
+
 #endif   /* EXECPARALLEL_H */
diff --git a/src/include/fmgr.h b/src/include/fmgr.h
index 0c695e2..2e41edf 100644
--- a/src/include/fmgr.h
+++ b/src/include/fmgr.h
@@ -667,6 +667,7 @@ extern bool get_fn_expr_arg_stable(FmgrInfo *flinfo, int argnum);
 extern bool get_call_expr_arg_stable(fmNodePtr expr, int argnum);
 extern bool get_fn_expr_variadic(FmgrInfo *flinfo);
 extern bool CheckFunctionValidatorAccess(Oid validatorOid, Oid functionOid);
+extern void *load_library_function(char *libraryname, char *funcname);
 
 /*
  * Routines in dfmgr.c
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to