On 25/06/2016 09:33, Amit Kapila wrote:
> On Wed, Jun 15, 2016 at 11:43 PM, Julien Rouhaud
> <julien.rouh...@dalibo.com> wrote:
>>
>> Attached v4 implements the design you suggested, I hope everything's ok.
>>
> 
> Few review comments:
> 

Thanks for the review.

> 1.
> + if (parallel && (BackgroundWorkerData->parallel_register_count -
> +
> BackgroundWorkerData->parallel_terminate_count) >=
> +
> max_parallel_workers)
> + return false;
> 
> I think this check should be done after acquiring
> BackgroundWorkerLock, otherwise some other backend can simultaneously
> increment parallel_register_count.
> 

You're right, fixed.

> 2.
> 
> +/*
> + * This flag is used internally for parallel queries, to keep track of the
> + * number of active
> parallel workers and make sure we never launch more than
> + * max_parallel_workers parallel workers at
> the same time.  Third part
> + * background workers should not use this flag.
> + */
> +#define
> BGWORKER_IS_PARALLEL_WORKER 0x0004
> +
> 
> "Third part", do yo want to say Third party?
> 

Yes, sorry. Fixed

> 3.
> static bool
> SanityCheckBackgroundWorker(BackgroundWorker *worker, int elevel)
> {
> ..
> }
> 
> Isn't it better to have a check in above function such that if
> bgw_flags is BGWORKER_IS_PARALLEL_WORKER and max_parallel_workers is
> zero, then ereport?  Also, consider if it is better to have some other
> checks related to BGWORKER_IS_PARALLEL_WORKER, like if caller sets
> BGWORKER_IS_PARALLEL_WORKER, then it must have database connection and
> shared memory access.
> 

I added these checks. I don't see another check to add right now.

> 4.
> +      <varlistentry id="guc-max-parallel-workers"
> xreflabel="max_parallel_workers">
> +       <term><varname>max_parallel_workers</varname> (<type>integer</type>)
> +       <indexterm>
> +        <primary><varname>max_parallel_workers</> configuration
> parameter</primary>
> +       </indexterm>
> +       </term>
> +       <listitem>
> +        <para>
> +         Sets the maximum number of workers that can be launched at the same
> +         time for the whole server.  This parameter allows the administrator 
> to
> +         reserve background worker slots for for third part dynamic 
> background
> +         workers.  The default value is 4.  Setting this value to 0 disables
> +         parallel query execution.
> +        </para>
> +       </listitem>
> +      </varlistentry>
> 
> How about phrasing it as:
> Sets the maximum number of workers that the system can support for
> parallel queries.  The default value is 4.  Setting this value to 0
> disables parallel query execution.
> 

It's better thanks.  Should we document somewhere the link between this
parameter and custom dynamic background workers or is it pretty
self-explanatory?

> 5.
> <primary><varname>max_parallel_workers_per_gather</> configuration
> parameter</primary>
>        </indexterm>
>        </term>
>        <listitem>
>         <para>
>          Sets the maximum number of workers that can be started by a single
>          <literal>Gather</literal> node.  Parallel workers are taken from the
>          pool of processes established by
>          <xref linkend="guc-max-worker-processes">.
> 
> I think it is better to change above in documentation to indicate that
> "pool of processes established by guc-max-parallel-workers".
> 

The real limit is the minimum of these two values, I think it's
important to be explicit here, since this pool is shared for parallelism
and custom bgworkers.

What about "pool of processes established by guc-max-worker-processes,
limited by guc-max-parallel-workers" (used in attached v5 patch)

-- 
Julien Rouhaud
http://dalibo.com - http://dalibo.org
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index a82bf06..6812b0d 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -2009,7 +2009,8 @@ include_dir 'conf.d'
          Sets the maximum number of workers that can be started by a single
          <literal>Gather</literal> node.  Parallel workers are taken from the
          pool of processes established by
-         <xref linkend="guc-max-worker-processes">.  Note that the requested
+         <xref linkend="guc-max-worker-processes">, limited by
+         <xref linked="guc-max-parallel-workers">.  Note that the requested
          number of workers may not actually be available at runtime.  If this
          occurs, the plan will run with fewer workers than expected, which may
          be inefficient.  The default value is 2.  Setting this value to 0
@@ -2018,6 +2019,21 @@ include_dir 'conf.d'
        </listitem>
       </varlistentry>
 
+      <varlistentry id="guc-max-parallel-workers" 
xreflabel="max_parallel_workers">
+       <term><varname>max_parallel_workers</varname> (<type>integer</type>)
+       <indexterm>
+        <primary><varname>max_parallel_workers</> configuration 
parameter</primary>
+       </indexterm>
+       </term>
+       <listitem>
+        <para>
+         Sets the maximum number of workers that the system can support for
+         parallel queries.  The default value is 4.  Setting this value to 0
+         disables parallel query execution.
+        </para>
+       </listitem>
+      </varlistentry>
+
       <varlistentry id="guc-backend-flush-after" 
xreflabel="backend_flush_after">
        <term><varname>backend_flush_after</varname> (<type>integer</type>)
        <indexterm>
diff --git a/src/backend/access/transam/parallel.c 
b/src/backend/access/transam/parallel.c
index 088700e..ea7680b 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -452,7 +452,8 @@ LaunchParallelWorkers(ParallelContext *pcxt)
        snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
                         MyProcPid);
        worker.bgw_flags =
-               BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
+               BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION
+               | BGWORKER_IS_PARALLEL_WORKER;
        worker.bgw_start_time = BgWorkerStart_ConsistentState;
        worker.bgw_restart_time = BGW_NEVER_RESTART;
        worker.bgw_main = ParallelWorkerMain;
diff --git a/src/backend/optimizer/path/allpaths.c 
b/src/backend/optimizer/path/allpaths.c
index 2e4b670..e1da5f9 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -724,9 +724,11 @@ create_plain_partial_paths(PlannerInfo *root, RelOptInfo 
*rel)
        }
 
        /*
-        * In no case use more than max_parallel_workers_per_gather workers.
+        * In no case use more than max_parallel_workers or
+        * max_parallel_workers_per_gather workers.
         */
-       parallel_workers = Min(parallel_workers, 
max_parallel_workers_per_gather);
+       parallel_workers = Min(max_parallel_workers, Min(parallel_workers,
+                               max_parallel_workers_per_gather));
 
        /* If any limit was set to zero, the user doesn't want a parallel scan. 
*/
        if (parallel_workers <= 0)
diff --git a/src/backend/optimizer/path/costsize.c 
b/src/backend/optimizer/path/costsize.c
index 8c1dccc..6cb2f4e 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -113,6 +113,7 @@ int                 effective_cache_size = 
DEFAULT_EFFECTIVE_CACHE_SIZE;
 
 Cost           disable_cost = 1.0e10;
 
+int                    max_parallel_workers = 4;
 int                    max_parallel_workers_per_gather = 2;
 
 bool           enable_seqscan = true;
diff --git a/src/backend/optimizer/plan/planner.c 
b/src/backend/optimizer/plan/planner.c
index 2372311..0e0fc74 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -246,8 +246,9 @@ standard_planner(Query *parse, int cursorOptions, 
ParamListInfo boundParams)
                IsUnderPostmaster && dynamic_shared_memory_type != 
DSM_IMPL_NONE &&
                parse->commandType == CMD_SELECT && !parse->hasModifyingCTE &&
                parse->utilityStmt == NULL && max_parallel_workers_per_gather > 
0 &&
-               !IsParallelWorker() && !IsolationIsSerializable() &&
-               !has_parallel_hazard((Node *) parse, true);
+               max_parallel_workers > 0 && !IsParallelWorker() &&
+               !IsolationIsSerializable() && !has_parallel_hazard((Node *) 
parse,
+                               true);
 
        /*
         * glob->parallelModeNeeded should tell us whether it's necessary to
diff --git a/src/backend/postmaster/bgworker.c 
b/src/backend/postmaster/bgworker.c
index 382edad..4a78a75 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -16,6 +16,7 @@
 
 #include "miscadmin.h"
 #include "libpq/pqsignal.h"
+#include "optimizer/cost.h"
 #include "postmaster/bgworker_internals.h"
 #include "postmaster/postmaster.h"
 #include "storage/barrier.h"
@@ -76,12 +77,15 @@ typedef struct BackgroundWorkerSlot
        bool            terminate;
        pid_t           pid;                    /* InvalidPid = not started 
yet; 0 = dead */
        uint64          generation;             /* incremented when slot is 
recycled */
+       bool            parallel;
        BackgroundWorker worker;
 } BackgroundWorkerSlot;
 
 typedef struct BackgroundWorkerArray
 {
        int                     total_slots;
+       uint32          parallel_register_count;
+       uint32          parallel_terminate_count;
        BackgroundWorkerSlot slot[FLEXIBLE_ARRAY_MEMBER];
 } BackgroundWorkerArray;
 
@@ -126,6 +130,8 @@ BackgroundWorkerShmemInit(void)
                int                     slotno = 0;
 
                BackgroundWorkerData->total_slots = max_worker_processes;
+               BackgroundWorkerData->parallel_register_count = 0;
+               BackgroundWorkerData->parallel_terminate_count = 0;
 
                /*
                 * Copy contents of worker list into shared memory.  Record the 
shared
@@ -144,6 +150,7 @@ BackgroundWorkerShmemInit(void)
                        slot->terminate = false;
                        slot->pid = InvalidPid;
                        slot->generation = 0;
+                       slot->parallel = false;
                        rw->rw_shmem_slot = slotno;
                        rw->rw_worker.bgw_notify_pid = 0;       /* might be 
reinit after crash */
                        memcpy(&slot->worker, &rw->rw_worker, 
sizeof(BackgroundWorker));
@@ -272,6 +279,8 @@ BackgroundWorkerStateChange(void)
                        pg_memory_barrier();
                        slot->pid = 0;
                        slot->in_use = false;
+                       if (slot->parallel)
+                               
BackgroundWorkerData->parallel_terminate_count++;
                        if (notify_pid != 0)
                                kill(notify_pid, SIGUSR1);
 
@@ -370,6 +379,8 @@ ForgetBackgroundWorker(slist_mutable_iter *cur)
        Assert(rw->rw_shmem_slot < max_worker_processes);
        slot = &BackgroundWorkerData->slot[rw->rw_shmem_slot];
        slot->in_use = false;
+       if (slot->parallel)
+               BackgroundWorkerData->parallel_terminate_count++;
 
        ereport(DEBUG1,
                        (errmsg("unregistering background worker \"%s\"",
@@ -498,6 +509,27 @@ SanityCheckBackgroundWorker(BackgroundWorker *worker, int 
elevel)
                /* XXX other checks? */
        }
 
+       if ((worker->bgw_flags & BGWORKER_IS_PARALLEL_WORKER) != 0)
+       {
+               if(max_parallel_workers == 0)
+               {
+                       ereport(elevel,
+                                       
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                        errmsg("background worker \"%s\": 
cannot request parallel worker if no parallel worker allowed",
+                                                       worker->bgw_name)));
+                       return false;
+               }
+               if((worker->bgw_flags & BGWORKER_SHMEM_ACCESS) == 0 ||
+                       (worker->bgw_flags & 
BGWORKER_BACKEND_DATABASE_CONNECTION) == 0)
+               {
+                       ereport(elevel,
+                                       
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                        errmsg("background worker \"%s\": 
cannot request parallel worker without shared memory access and database 
connection",
+                                                       worker->bgw_name)));
+                       return false;
+               }
+       }
+
        if ((worker->bgw_restart_time < 0 &&
                 worker->bgw_restart_time != BGW_NEVER_RESTART) ||
                (worker->bgw_restart_time > USECS_PER_DAY / 1000))
@@ -824,6 +856,7 @@ RegisterDynamicBackgroundWorker(BackgroundWorker *worker,
 {
        int                     slotno;
        bool            success = false;
+       bool            parallel;
        uint64          generation = 0;
 
        /*
@@ -840,8 +873,18 @@ RegisterDynamicBackgroundWorker(BackgroundWorker *worker,
        if (!SanityCheckBackgroundWorker(worker, ERROR))
                return false;
 
+       parallel = ((worker->bgw_flags & BGWORKER_IS_PARALLEL_WORKER) != 0);
+
        LWLockAcquire(BackgroundWorkerLock, LW_EXCLUSIVE);
 
+       if (parallel && (BackgroundWorkerData->parallel_register_count -
+                                        
BackgroundWorkerData->parallel_terminate_count) >=
+                                        max_parallel_workers)
+       {
+               LWLockRelease(BackgroundWorkerLock);
+               return false;
+       }
+
        /*
         * Look for an unused slot.  If we find one, grab it.
         */
@@ -855,7 +898,10 @@ RegisterDynamicBackgroundWorker(BackgroundWorker *worker,
                        slot->pid = InvalidPid;         /* indicates not 
started yet */
                        slot->generation++;
                        slot->terminate = false;
+                       slot->parallel = parallel;
                        generation = slot->generation;
+                       if (parallel)
+                               BackgroundWorkerData->parallel_register_count++;
 
                        /*
                         * Make sure postmaster doesn't see the slot as in use 
before it
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 60148b8..e051000 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -2657,6 +2657,16 @@ static struct config_int ConfigureNamesInt[] =
        },
 
        {
+               {"max_parallel_workers", PGC_USERSET, RESOURCES_ASYNCHRONOUS,
+                       gettext_noop("Sets the maximum number of parallel 
processes for the cluster."),
+                       NULL
+               },
+               &max_parallel_workers,
+               4, 0, 1024,
+               NULL, NULL, NULL
+       },
+
+       {
                {"autovacuum_work_mem", PGC_SIGHUP, RESOURCES_MEM,
                        gettext_noop("Sets the maximum memory to be used by 
each autovacuum worker process."),
                        NULL,
diff --git a/src/backend/utils/misc/postgresql.conf.sample 
b/src/backend/utils/misc/postgresql.conf.sample
index 3fa0540..3ff996f 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -168,6 +168,7 @@
 #effective_io_concurrency = 1          # 1-1000; 0 disables prefetching
 #max_worker_processes = 8              # (change requires restart)
 #max_parallel_workers_per_gather = 2   # taken from max_worker_processes
+#max_parallel_workers = 4          # total maximum number of worker_processes
 #old_snapshot_threshold = -1           # 1min-60d; -1 disables; 0 is immediate
                                                                        # 
(change requires restart)
 #backend_flush_after = 0               # 0 disables, default is 0
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index 2a4df2f..190f33b 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -55,6 +55,7 @@ extern PGDLLIMPORT double parallel_setup_cost;
 extern PGDLLIMPORT int effective_cache_size;
 extern Cost disable_cost;
 extern int     max_parallel_workers_per_gather;
+extern int     max_parallel_workers;
 extern bool enable_seqscan;
 extern bool enable_indexscan;
 extern bool enable_indexonlyscan;
diff --git a/src/include/postmaster/bgworker.h 
b/src/include/postmaster/bgworker.h
index b6889a3..4236e37 100644
--- a/src/include/postmaster/bgworker.h
+++ b/src/include/postmaster/bgworker.h
@@ -58,6 +58,14 @@
  */
 #define BGWORKER_BACKEND_DATABASE_CONNECTION           0x0002
 
+/*
+ * This flag is used internally for parallel queries, to keep track of the
+ * number of active parallel workers and make sure we never launch more than
+ * max_parallel_workers parallel workers at the same time.  Third party
+ * background workers should not use this flag.
+ */
+#define BGWORKER_IS_PARALLEL_WORKER                                    0x0004
+
 
 typedef void (*bgworker_main_type) (Datum main_arg);
 
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to