On 11/01/2016 03:29 PM, Robert Haas wrote:
On Tue, Nov 1, 2016 at 10:21 AM, Tomas Vondra
<tomas.von...@2ndquadrant.com> wrote:
Clearly we need to pass some information to the worker processes, so that
they know whether to instrument the query or not. I don't know if there's a
good non-invasive way to do that from an extension - the easiest way I can
think of is using a bit of shared memory to pass the "sample query" flag -
attached is a patch that does that, and it seems to be working fine for me.

Uh, isn't this going to break as soon as there are multiple parallel
queries in process at the same time?


Damn! You're right of course. Who'd guess I need more coffee this early?

Attached is a fix replacing the flag with an array of flags, indexed by ParallelMasterBackendId. Hopefully that makes it work with multiple concurrent parallel queries ... still, I'm not sure this is the right solution.

regards

--
Tomas Vondra                  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
diff --git a/contrib/auto_explain/auto_explain.c b/contrib/auto_explain/auto_explain.c
index 4ccd2aa..192a3a1 100644
--- a/contrib/auto_explain/auto_explain.c
+++ b/contrib/auto_explain/auto_explain.c
@@ -14,8 +14,13 @@
 
 #include <limits.h>
 
+#include "access/parallel.h"
 #include "commands/explain.h"
 #include "executor/instrument.h"
+#include "miscadmin.h"
+#include "storage/ipc.h"
+#include "storage/lwlock.h"
+#include "storage/shmem.h"
 #include "utils/guc.h"
 
 PG_MODULE_MAGIC;
@@ -47,10 +52,13 @@ static ExecutorStart_hook_type prev_ExecutorStart = NULL;
 static ExecutorRun_hook_type prev_ExecutorRun = NULL;
 static ExecutorFinish_hook_type prev_ExecutorFinish = NULL;
 static ExecutorEnd_hook_type prev_ExecutorEnd = NULL;
+static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
 
 /* Is the current query sampled, per backend */
 static bool current_query_sampled = true;
 
+static int parent;
+
 #define auto_explain_enabled() \
 	(auto_explain_log_min_duration >= 0 && \
 	 (nesting_level == 0 || auto_explain_log_nested_statements))
@@ -65,6 +73,18 @@ static void explain_ExecutorRun(QueryDesc *queryDesc,
 static void explain_ExecutorFinish(QueryDesc *queryDesc);
 static void explain_ExecutorEnd(QueryDesc *queryDesc);
 
+typedef struct AutoExplainState
+{
+	LWLock	   *lock;							/* protects shared statr */
+	bool		sampled[FLEXIBLE_ARRAY_MEMBER];	/* should we sample this query? */
+} AutoExplainState;
+
+static AutoExplainState *explainState;
+
+static void explain_shmem_startup(void);
+static bool is_query_sampled(void);
+static void set_query_sampled(bool sample);
+static Size explain_shmem_size(void);
 
 /*
  * Module load callback
@@ -72,6 +92,9 @@ static void explain_ExecutorEnd(QueryDesc *queryDesc);
 void
 _PG_init(void)
 {
+	if (!process_shared_preload_libraries_in_progress)
+		elog(ERROR, "auto_explain can only be loaded via shared_preload_libraries");
+
 	/* Define custom GUC variables. */
 	DefineCustomIntVariable("auto_explain.log_min_duration",
 		 "Sets the minimum execution time above which plans will be logged.",
@@ -176,6 +199,8 @@ _PG_init(void)
 							 NULL,
 							 NULL);
 
+	RequestAddinShmemSpace(explain_shmem_size());
+	RequestNamedLWLockTranche("auto_explain", 1);
 	EmitWarningsOnPlaceholders("auto_explain");
 
 	/* Install hooks. */
@@ -187,6 +212,15 @@ _PG_init(void)
 	ExecutorFinish_hook = explain_ExecutorFinish;
 	prev_ExecutorEnd = ExecutorEnd_hook;
 	ExecutorEnd_hook = explain_ExecutorEnd;
+
+	prev_shmem_startup_hook = shmem_startup_hook;
+	shmem_startup_hook = explain_shmem_startup;
+}
+
+static Size
+explain_shmem_size(void)
+{
+	return offsetof(AutoExplainState, sampled) + MaxConnections * sizeof(bool);
 }
 
 /*
@@ -200,6 +234,7 @@ _PG_fini(void)
 	ExecutorRun_hook = prev_ExecutorRun;
 	ExecutorFinish_hook = prev_ExecutorFinish;
 	ExecutorEnd_hook = prev_ExecutorEnd;
+	shmem_startup_hook = prev_shmem_startup_hook;
 }
 
 /*
@@ -208,15 +243,20 @@ _PG_fini(void)
 static void
 explain_ExecutorStart(QueryDesc *queryDesc, int eflags)
 {
+	parent = MyBackendId;
+
 	/*
 	 * For rate sampling, randomly choose top-level statement. Either all
 	 * nested statements will be explained or none will.
 	 */
-	if (auto_explain_log_min_duration >= 0 && nesting_level == 0)
+	if (auto_explain_log_min_duration >= 0 && nesting_level == 0 && (! IsParallelWorker()))
+	{
 		current_query_sampled = (random() < auto_explain_sample_rate *
 								 MAX_RANDOM_VALUE);
+		set_query_sampled(current_query_sampled);
+	}
 
-	if (auto_explain_enabled() && current_query_sampled)
+	if (auto_explain_enabled() && is_query_sampled())
 	{
 		/* Enable per-node instrumentation iff log_analyze is required. */
 		if (auto_explain_log_analyze && (eflags & EXEC_FLAG_EXPLAIN_ONLY) == 0)
@@ -235,7 +275,7 @@ explain_ExecutorStart(QueryDesc *queryDesc, int eflags)
 	else
 		standard_ExecutorStart(queryDesc, eflags);
 
-	if (auto_explain_enabled() && current_query_sampled)
+	if (auto_explain_enabled() && is_query_sampled())
 	{
 		/*
 		 * Set up to track total elapsed time in ExecutorRun.  Make sure the
@@ -305,7 +345,7 @@ explain_ExecutorFinish(QueryDesc *queryDesc)
 static void
 explain_ExecutorEnd(QueryDesc *queryDesc)
 {
-	if (queryDesc->totaltime && auto_explain_enabled() && current_query_sampled)
+	if (queryDesc->totaltime && auto_explain_enabled() && (! IsParallelWorker()))
 	{
 		double		msec;
 
@@ -366,3 +406,57 @@ explain_ExecutorEnd(QueryDesc *queryDesc)
 	else
 		standard_ExecutorEnd(queryDesc);
 }
+
+static void
+explain_shmem_startup(void)
+{
+	bool		found;
+
+	if (prev_shmem_startup_hook)
+		prev_shmem_startup_hook();
+
+	explainState = NULL;
+
+	LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
+
+	explainState = ShmemInitStruct("auto_explain",
+						   sizeof(AutoExplainState),
+						   &found);
+	if (!found)
+	{
+		/* First time through ... */
+		explainState->lock = &(GetNamedLWLockTranche("auto_explain"))->lock;
+		memset(explainState->sampled, 1, sizeof(bool) * MaxConnections);
+	}
+
+	LWLockRelease(AddinShmemInitLock);
+}
+
+static bool
+is_query_sampled(void)
+{
+	bool sampled;
+
+	/* in leader we can just check the global variable */
+	if (! IsParallelWorker())
+		return current_query_sampled;
+
+	/* in worker processes we need to get the info from shared memory */
+	LWLockAcquire(explainState->lock, LW_SHARED);
+	sampled = explainState->sampled[ParallelMasterBackendId];
+	LWLockRelease(explainState->lock);
+
+	return sampled;
+}
+
+static void
+set_query_sampled(bool sample)
+{
+	/* the decisions should only be made in leader */
+	Assert(! IsParallelWorker());
+
+	/* in worker processes we need to get the info from shared memory */
+	LWLockAcquire(explainState->lock, LW_EXCLUSIVE);
+	explainState->sampled[MyBackendId] = sample;
+	LWLockRelease(explainState->lock);
+}
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to