On Mon, Oct 5, 2015 at 8:20 AM, Amit Kapila <amit.kapil...@gmail.com> wrote: > [ new patch for heapam.c changes ]
I went over this in a fair amount of detail and reworked it somewhat. The result is attached as parallel-heapscan-revised.patch. I think the way I did this is a bit cleaner than what you had, although it's basically the same thing. There are fewer changes to initscan, and we don't need one function to initialize the starting block that must be called in each worker and then another one to get the next block, and generally the changes are a bit more localized. I also went over the comments and, I think, improved them. I tweaked the logic for reporting the starting scan position as the last position report; I think the way you had it the last report would be for one block earlier. I'm pretty happy with this version and hope to commit it soon. There's a second patch attached here as well, parallel-relaunch.patch, which makes it possible to relaunch workers with the same parallel context. Currently, after you WaitForParallelWorkersToFinish(), you must proceed without fail to DestroyParallelContext(). With this rather simple patch, you have the option to instead go back and again LaunchParallelWorkers(), which is nice because it avoids the overhead of setting up a new DSM and filling it with all of your transaction state a second time. I'd like to commit this as well, and I think we should revise execParallel.c to use it. Finally, I've attached some test code in parallel-dummy.patch. This demonstrates how the code in 0001 and 0002 can be used. It scans a relation, counts the tuples, and then gratuitously rescans it and counts the tuples again. This shows that rescanning works and that the syncscan position gets left in the right place. -- Robert Haas EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
From d083f8a9e5a45ea5d13358e2d70ef9405caee62f Mon Sep 17 00:00:00 2001 From: Robert Haas <rhaas@postgresql.org> Date: Thu, 15 Oct 2015 19:21:38 -0400 Subject: [PATCH 3/3] parallel_dummy --- contrib/parallel_dummy/Makefile | 19 ++++ contrib/parallel_dummy/parallel_dummy--1.0.sql | 7 ++ contrib/parallel_dummy/parallel_dummy.c | 151 +++++++++++++++++++++++++ contrib/parallel_dummy/parallel_dummy.control | 4 + 4 files changed, 181 insertions(+) create mode 100644 contrib/parallel_dummy/Makefile create mode 100644 contrib/parallel_dummy/parallel_dummy--1.0.sql create mode 100644 contrib/parallel_dummy/parallel_dummy.c create mode 100644 contrib/parallel_dummy/parallel_dummy.control diff --git a/contrib/parallel_dummy/Makefile b/contrib/parallel_dummy/Makefile new file mode 100644 index 0000000..de00f50 --- /dev/null +++ b/contrib/parallel_dummy/Makefile @@ -0,0 +1,19 @@ +MODULE_big = parallel_dummy +OBJS = parallel_dummy.o $(WIN32RES) +PGFILEDESC = "parallel_dummy - dummy use of parallel infrastructure" + +EXTENSION = parallel_dummy +DATA = parallel_dummy--1.0.sql + +REGRESS = parallel_dummy + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = contrib/parallel_dummy +top_builddir = ../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/contrib/parallel_dummy/parallel_dummy--1.0.sql b/contrib/parallel_dummy/parallel_dummy--1.0.sql new file mode 100644 index 0000000..d49bd0f --- /dev/null +++ b/contrib/parallel_dummy/parallel_dummy--1.0.sql @@ -0,0 +1,7 @@ +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION parallel_dummy" to load this file. \quit + +CREATE FUNCTION parallel_count(rel pg_catalog.regclass, + nworkers pg_catalog.int4) + RETURNS pg_catalog.int8 STRICT + AS 'MODULE_PATHNAME' LANGUAGE C; diff --git a/contrib/parallel_dummy/parallel_dummy.c b/contrib/parallel_dummy/parallel_dummy.c new file mode 100644 index 0000000..a87a573 --- /dev/null +++ b/contrib/parallel_dummy/parallel_dummy.c @@ -0,0 +1,151 @@ +/*-------------------------------------------------------------------------- + * + * parallel_dummy.c + * Test harness code for parallel mode code. + * + * Copyright (C) 2013-2014, PostgreSQL Global Development Group + * + * IDENTIFICATION + * contrib/parallel_dummy/parallel_dummy.c + * + * ------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/heapam.h" +#include "access/parallel.h" +#include "access/relscan.h" +#include "access/xact.h" +#include "fmgr.h" +#include "miscadmin.h" +#include "storage/spin.h" +#include "utils/builtins.h" +#include "utils/guc.h" +#include "utils/snapmgr.h" + +PG_MODULE_MAGIC; + +PG_FUNCTION_INFO_V1(parallel_count); + +#define TOC_SCAN_KEY 1 +#define TOC_RESULT_KEY 2 + +void _PG_init(void); +void count_worker_main(dsm_segment *seg, shm_toc *toc); + +static void count_helper(HeapScanDesc pscan, int64 *resultp); + +Datum +parallel_count(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + int32 nworkers = PG_GETARG_INT32(1); + int64 *resultp; + int64 result; + ParallelContext *pcxt; + ParallelHeapScanDesc pscan; + HeapScanDesc scan; + Relation rel; + Size sz; + + if (nworkers < 0) + ereport(ERROR, + (errmsg("number of parallel workers must be non-negative"))); + + rel = relation_open(relid, AccessShareLock); + + EnterParallelMode(); + + pcxt = CreateParallelContextForExternalFunction("parallel_dummy", + "count_worker_main", + nworkers); + sz = heap_parallelscan_estimate(GetActiveSnapshot()); + shm_toc_estimate_chunk(&pcxt->estimator, sz); + shm_toc_estimate_chunk(&pcxt->estimator, sizeof(int64)); + shm_toc_estimate_keys(&pcxt->estimator, 2); + InitializeParallelDSM(pcxt); + pscan = shm_toc_allocate(pcxt->toc, sz); + heap_parallelscan_initialize(pscan, rel, GetActiveSnapshot()); + shm_toc_insert(pcxt->toc, TOC_SCAN_KEY, pscan); + resultp = shm_toc_allocate(pcxt->toc, sizeof(int64)); + shm_toc_insert(pcxt->toc, TOC_RESULT_KEY, resultp); + + LaunchParallelWorkers(pcxt); + + scan = heap_beginscan_parallel(rel, pscan); + + /* here's where we do the "real work" ... */ + count_helper(scan, resultp); + + WaitForParallelWorkersToFinish(pcxt); + + heap_rescan(scan, NULL); + *resultp = 0; + + LaunchParallelWorkers(pcxt); + + result = *resultp; + + /* and here's where we do the real work again */ + count_helper(scan, resultp); + + WaitForParallelWorkersToFinish(pcxt); + + result = *resultp; + + heap_endscan(scan); + + DestroyParallelContext(pcxt); + + relation_close(rel, AccessShareLock); + + ExitParallelMode(); + + PG_RETURN_INT64(result); +} + +void +count_worker_main(dsm_segment *seg, shm_toc *toc) +{ + ParallelHeapScanDesc pscan; + Relation rel; + HeapScanDesc scan; + int64 *resultp; + + pscan = shm_toc_lookup(toc, TOC_SCAN_KEY); + resultp = shm_toc_lookup(toc, TOC_RESULT_KEY); + Assert(pscan != NULL && resultp != NULL); + + rel = relation_open(pscan->phs_relid, AccessShareLock); + scan = heap_beginscan_parallel(rel, pscan); + count_helper(scan, resultp); + heap_endscan(scan); + relation_close(rel, AccessShareLock); +} + +static void +count_helper(HeapScanDesc scan, int64 *resultp) +{ + int64 mytuples = 0; + BlockNumber firstblock = InvalidBlockNumber; + + for (;;) + { + HeapTuple tup = heap_getnext(scan, ForwardScanDirection); + + if (!HeapTupleIsValid(tup)) + break; + if (firstblock == InvalidBlockNumber) + firstblock = scan->rs_cblock; + + ++mytuples; + } + + SpinLockAcquire(&scan->rs_parallel->phs_mutex); /* dirty hack */ + *resultp += mytuples; + SpinLockRelease(&scan->rs_parallel->phs_mutex); + + elog(NOTICE, "PID %d counted " INT64_FORMAT " tuples starting at block %u", + MyProcPid, mytuples, firstblock); +} diff --git a/contrib/parallel_dummy/parallel_dummy.control b/contrib/parallel_dummy/parallel_dummy.control new file mode 100644 index 0000000..90bae3f --- /dev/null +++ b/contrib/parallel_dummy/parallel_dummy.control @@ -0,0 +1,4 @@ +comment = 'Dummy parallel code' +default_version = '1.0' +module_pathname = '$libdir/parallel_dummy' +relocatable = true -- 2.3.8 (Apple Git-58)
From a826e52d7c2c0213866007e05b20a0c3b3f1eb77 Mon Sep 17 00:00:00 2001 From: Robert Haas <rhaas@postgresql.org> Date: Thu, 15 Oct 2015 19:21:22 -0400 Subject: [PATCH 2/3] Parallel heapscan stuff. --- src/backend/access/heap/heapam.c | 244 +++++++++++++++++++++++++++++++++++++-- src/include/access/heapam.h | 8 +- src/include/access/relscan.h | 20 ++++ 3 files changed, 261 insertions(+), 11 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index bcf9871..66deb1f 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -63,6 +63,7 @@ #include "storage/predicate.h" #include "storage/procarray.h" #include "storage/smgr.h" +#include "storage/spin.h" #include "storage/standby.h" #include "utils/datum.h" #include "utils/inval.h" @@ -80,12 +81,14 @@ bool synchronize_seqscans = true; static HeapScanDesc heap_beginscan_internal(Relation relation, Snapshot snapshot, int nkeys, ScanKey key, + ParallelHeapScanDesc parallel_scan, bool allow_strat, bool allow_sync, bool allow_pagemode, bool is_bitmapscan, bool is_samplescan, bool temp_snap); +static BlockNumber heap_parallelscan_nextpage(HeapScanDesc scan); static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid, CommandId cid, int options); static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf, @@ -226,7 +229,10 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) * results for a non-MVCC snapshot, the caller must hold some higher-level * lock that ensures the interesting tuple(s) won't change.) */ - scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_rd); + if (scan->rs_parallel != NULL) + scan->rs_nblocks = scan->rs_parallel->phs_nblocks; + else + scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_rd); /* * If the table is large relative to NBuffers, use a bulk-read access @@ -237,7 +243,8 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) * behaviors, independently of the size of the table; also there is a GUC * variable that can disable synchronized scanning.) * - * During a rescan, don't make a new strategy object if we don't have to. + * Note that heap_parallelscan_initialize has a very similar test; if you + * change this, consider changing that one, too. */ if (!RelationUsesLocalBuffers(scan->rs_rd) && scan->rs_nblocks > NBuffers / 4) @@ -250,6 +257,7 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) if (allow_strat) { + /* During a rescan, keep the previous strategy object. */ if (scan->rs_strategy == NULL) scan->rs_strategy = GetAccessStrategy(BAS_BULKREAD); } @@ -260,7 +268,12 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) scan->rs_strategy = NULL; } - if (keep_startblock) + if (scan->rs_parallel != NULL) + { + /* For parallel scan, believe whatever ParallelHeapScanDesc says. */ + scan->rs_syncscan = scan->rs_parallel->phs_syncscan; + } + else if (keep_startblock) { /* * When rescanning, we want to keep the previous startblock setting, @@ -496,7 +509,20 @@ heapgettup(HeapScanDesc scan, tuple->t_data = NULL; return; } - page = scan->rs_startblock; /* first page */ + if (scan->rs_parallel != NULL) + { + page = heap_parallelscan_nextpage(scan); + + /* Other processes might have already finished the scan. */ + if (page == InvalidBlockNumber) + { + Assert(!BufferIsValid(scan->rs_cbuf)); + tuple->t_data = NULL; + return; + } + } + else + page = scan->rs_startblock; /* first page */ heapgetpage(scan, page); lineoff = FirstOffsetNumber; /* first offnum */ scan->rs_inited = true; @@ -519,6 +545,9 @@ heapgettup(HeapScanDesc scan, } else if (backward) { + /* backward parallel scan not supported */ + Assert(scan->rs_parallel == NULL); + if (!scan->rs_inited) { /* @@ -669,6 +698,11 @@ heapgettup(HeapScanDesc scan, page = scan->rs_nblocks; page--; } + else if (scan->rs_parallel != NULL) + { + page = heap_parallelscan_nextpage(scan); + finished = (page == InvalidBlockNumber); + } else { page++; @@ -773,7 +807,20 @@ heapgettup_pagemode(HeapScanDesc scan, tuple->t_data = NULL; return; } - page = scan->rs_startblock; /* first page */ + if (scan->rs_parallel != NULL) + { + page = heap_parallelscan_nextpage(scan); + + /* Other processes might have already finished the scan. */ + if (page == InvalidBlockNumber) + { + Assert(!BufferIsValid(scan->rs_cbuf)); + tuple->t_data = NULL; + return; + } + } + else + page = scan->rs_startblock; /* first page */ heapgetpage(scan, page); lineindex = 0; scan->rs_inited = true; @@ -793,6 +840,9 @@ heapgettup_pagemode(HeapScanDesc scan, } else if (backward) { + /* backward parallel scan not supported */ + Assert(scan->rs_parallel == NULL); + if (!scan->rs_inited) { /* @@ -932,6 +982,11 @@ heapgettup_pagemode(HeapScanDesc scan, page = scan->rs_nblocks; page--; } + else if (scan->rs_parallel != NULL) + { + page = heap_parallelscan_nextpage(scan); + finished = (page == InvalidBlockNumber); + } else { page++; @@ -1341,7 +1396,7 @@ HeapScanDesc heap_beginscan(Relation relation, Snapshot snapshot, int nkeys, ScanKey key) { - return heap_beginscan_internal(relation, snapshot, nkeys, key, + return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL, true, true, true, false, false, false); } @@ -1351,7 +1406,7 @@ heap_beginscan_catalog(Relation relation, int nkeys, ScanKey key) Oid relid = RelationGetRelid(relation); Snapshot snapshot = RegisterSnapshot(GetCatalogSnapshot(relid)); - return heap_beginscan_internal(relation, snapshot, nkeys, key, + return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL, true, true, true, false, false, true); } @@ -1360,7 +1415,7 @@ heap_beginscan_strat(Relation relation, Snapshot snapshot, int nkeys, ScanKey key, bool allow_strat, bool allow_sync) { - return heap_beginscan_internal(relation, snapshot, nkeys, key, + return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL, allow_strat, allow_sync, true, false, false, false); } @@ -1369,7 +1424,7 @@ HeapScanDesc heap_beginscan_bm(Relation relation, Snapshot snapshot, int nkeys, ScanKey key) { - return heap_beginscan_internal(relation, snapshot, nkeys, key, + return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL, false, false, true, true, false, false); } @@ -1378,7 +1433,7 @@ heap_beginscan_sampling(Relation relation, Snapshot snapshot, int nkeys, ScanKey key, bool allow_strat, bool allow_sync, bool allow_pagemode) { - return heap_beginscan_internal(relation, snapshot, nkeys, key, + return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL, allow_strat, allow_sync, allow_pagemode, false, true, false); } @@ -1386,6 +1441,7 @@ heap_beginscan_sampling(Relation relation, Snapshot snapshot, static HeapScanDesc heap_beginscan_internal(Relation relation, Snapshot snapshot, int nkeys, ScanKey key, + ParallelHeapScanDesc parallel_scan, bool allow_strat, bool allow_sync, bool allow_pagemode, @@ -1418,6 +1474,7 @@ heap_beginscan_internal(Relation relation, Snapshot snapshot, scan->rs_allow_strat = allow_strat; scan->rs_allow_sync = allow_sync; scan->rs_temp_snap = temp_snap; + scan->rs_parallel = parallel_scan; /* * we can use page-at-a-time mode if it's an MVCC-safe snapshot @@ -1473,6 +1530,25 @@ heap_rescan(HeapScanDesc scan, * reinitialize scan descriptor */ initscan(scan, key, true); + + /* + * reset parallel scan, if present + */ + if (scan->rs_parallel != NULL) + { + ParallelHeapScanDesc parallel_scan; + + /* + * Caller is responsible for making sure that all workers have + * finished the scan before calling this, so it really shouldn't be + * necessary to acquire the mutex at all. We acquire it anyway, just + * to be tidy. + */ + parallel_scan = scan->rs_parallel; + SpinLockAcquire(¶llel_scan->phs_mutex); + parallel_scan->phs_cblock = parallel_scan->phs_startblock; + SpinLockRelease(¶llel_scan->phs_mutex); + } } /* ---------------- @@ -1532,6 +1608,154 @@ heap_endscan(HeapScanDesc scan) } /* ---------------- + * heap_parallelscan_estimate - estimate storage for ParallelHeapScanDesc + * + * Sadly, this doesn't reduce to a constant, because the size required + * to serialize the snapshot can vary. + * ---------------- + */ +Size +heap_parallelscan_estimate(Snapshot snapshot) +{ + return add_size(offsetof(ParallelHeapScanDescData, phs_snapshot_data), + EstimateSnapshotSpace(snapshot)); +} + +/* ---------------- + * heap_parallelscan_initialize - initialize ParallelHeapScanDesc + * + * Must allow as many bytes of shared memory as returned by + * heap_parallelscan_estimate. Call this just once in the leader + * process; then, individual workers attach via heap_beginscan_parallel. + * ---------------- + */ +void +heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation, + Snapshot snapshot) +{ + target->phs_relid = RelationGetRelid(relation); + target->phs_nblocks = RelationGetNumberOfBlocks(relation); + /* compare phs_syncscan initialization to similar logic in initscan */ + target->phs_syncscan = synchronize_seqscans && + !RelationUsesLocalBuffers(relation) && + target->phs_nblocks > NBuffers / 4; + SpinLockInit(&target->phs_mutex); + target->phs_cblock = InvalidBlockNumber; + target->phs_startblock = InvalidBlockNumber; + SerializeSnapshot(snapshot, target->phs_snapshot_data); +} + +/* ---------------- + * heap_beginscan_parallel - join a parallel scan + * + * Caller must hold a suitable lock on the correct relation. + * ---------------- + */ +HeapScanDesc +heap_beginscan_parallel(Relation relation, ParallelHeapScanDesc parallel_scan) +{ + Snapshot snapshot; + + Assert(RelationGetRelid(relation) == parallel_scan->phs_relid); + snapshot = RestoreSnapshot(parallel_scan->phs_snapshot_data); + RegisterSnapshot(snapshot); + + return heap_beginscan_internal(relation, snapshot, 0, NULL, parallel_scan, + true, true, true, false, false, true); +} + +/* ---------------- + * heap_parallelscan_nextpage - get the next page to scan + * + * Get the next page to scan. Even if there are no pages left to scan, + * another backend could have grabbed a page to scan and not yet finished + * looking at it, so it doesn't follow that the scan is done when the + * first backend gets an InvalidBlockNumber return. + * ---------------- + */ +static BlockNumber +heap_parallelscan_nextpage(HeapScanDesc scan) +{ + BlockNumber page = InvalidBlockNumber; + BlockNumber sync_startpage = InvalidBlockNumber; + BlockNumber report_page = InvalidBlockNumber; + ParallelHeapScanDesc parallel_scan; + + Assert(scan->rs_parallel); + parallel_scan = scan->rs_parallel; + +retry: + /* Grab the spinlock. */ + SpinLockAcquire(¶llel_scan->phs_mutex); + + /* + * If the scan's startblock has not yet been initialized, we must do so + * now. If this is not a synchronized scan, we just start at block 0, but + * if it is a synchronized scan, we must get the starting position from + * the synchronized scan machinery. We can't hold the spinlock while + * doing that, though, so release the spinlock, get the information we + * need, and retry. If nobody else has initialized the scan in the + * meantime, we'll fill in the value we fetched on the second time + * through. + */ + if (parallel_scan->phs_startblock == InvalidBlockNumber) + { + if (!parallel_scan->phs_syncscan) + parallel_scan->phs_startblock = 0; + else if (sync_startpage != InvalidBlockNumber) + parallel_scan->phs_startblock = sync_startpage; + else + { + SpinLockRelease(¶llel_scan->phs_mutex); + sync_startpage = ss_get_location(scan->rs_rd, scan->rs_nblocks); + goto retry; + } + parallel_scan->phs_cblock = parallel_scan->phs_startblock; + } + + /* + * The current block number is the next one that needs to be scanned, + * unless it's InvalidBlockNumber already, in which case there are no more + * blocks to scan. After remembering the current value, we must advance + * it so that the next call to this function returns the next block to be + * scanned. + */ + page = parallel_scan->phs_cblock; + if (page != InvalidBlockNumber) + { + parallel_scan->phs_cblock++; + if (parallel_scan->phs_cblock >= scan->rs_nblocks) + parallel_scan->phs_cblock = 0; + if (parallel_scan->phs_cblock == parallel_scan->phs_startblock) + { + parallel_scan->phs_cblock = InvalidBlockNumber; + report_page = parallel_scan->phs_startblock; + } + } + + /* Release the lock. */ + SpinLockRelease(¶llel_scan->phs_mutex); + + /* + * Report scan location. Normally, we report the current page number. + * When we reach the end of the scan, though, we report the starting page, + * not the ending page, just so the starting positions for later scans + * doesn't slew backwards. We only report the position at the end of the + * scan once, though: subsequent callers will have report nothing, since + * they will have page == InvalidBlockNumber. + */ + if (scan->rs_syncscan) + { + if (report_page == InvalidBlockNumber) + report_page = page; + if (report_page != InvalidBlockNumber) + ss_report_location(scan->rs_rd, report_page); + } + + return page; +} + +/* ---------------- * heap_getnext - retrieve next tuple in scan * * Fix to work with index relations. diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 75e6b72..98eeadd 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -96,8 +96,9 @@ extern Relation heap_openrv_extended(const RangeVar *relation, #define heap_close(r,l) relation_close(r,l) -/* struct definition appears in relscan.h */ +/* struct definitions appear in relscan.h */ typedef struct HeapScanDescData *HeapScanDesc; +typedef struct ParallelHeapScanDescData *ParallelHeapScanDesc; /* * HeapScanIsValid @@ -126,6 +127,11 @@ extern void heap_rescan_set_params(HeapScanDesc scan, ScanKey key, extern void heap_endscan(HeapScanDesc scan); extern HeapTuple heap_getnext(HeapScanDesc scan, ScanDirection direction); +extern Size heap_parallelscan_estimate(Snapshot snapshot); +extern void heap_parallelscan_initialize(ParallelHeapScanDesc target, + Relation relation, Snapshot snapshot); +extern HeapScanDesc heap_beginscan_parallel(Relation, ParallelHeapScanDesc); + extern bool heap_fetch(Relation relation, Snapshot snapshot, HeapTuple tuple, Buffer *userbuf, bool keep_buf, Relation stats_relation); diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h index 6e62319..356c7e6 100644 --- a/src/include/access/relscan.h +++ b/src/include/access/relscan.h @@ -20,6 +20,25 @@ #include "access/itup.h" #include "access/tupdesc.h" +/* + * Shared state for parallel heap scan. + * + * Each backend participating in a parallel heap scan has its own + * HeapScanDesc in backend-private memory, and those objects all contain + * a pointer to this structure. The information here must be sufficient + * to properly initialize each new HeapScanDesc as workers join the scan, + * and it must act as a font of block numbers for those workers. + */ +typedef struct ParallelHeapScanDescData +{ + Oid phs_relid; /* OID of relation to scan */ + bool phs_syncscan; /* report location to syncscan logic? */ + BlockNumber phs_nblocks; /* # blocks in relation at start of scan */ + slock_t phs_mutex; /* mutual exclusion for block number fields */ + BlockNumber phs_startblock; /* starting block number */ + BlockNumber phs_cblock; /* current block number */ + char phs_snapshot_data[FLEXIBLE_ARRAY_MEMBER]; +} ParallelHeapScanDescData; typedef struct HeapScanDescData { @@ -49,6 +68,7 @@ typedef struct HeapScanDescData BlockNumber rs_cblock; /* current block # in scan, if any */ Buffer rs_cbuf; /* current buffer in scan, if any */ /* NB: if rs_cbuf is not InvalidBuffer, we hold a pin on that buffer */ + ParallelHeapScanDesc rs_parallel; /* parallel scan information */ /* these fields only used in page-at-a-time mode and for bitmap scans */ int rs_cindex; /* current tuple's index in vistuples */ -- 2.3.8 (Apple Git-58)
From 2b1069b3ad1febc7ec76615607087d1484a8dcba Mon Sep 17 00:00:00 2001 From: Robert Haas <rhaas@postgresql.org> Date: Thu, 15 Oct 2015 19:19:40 -0400 Subject: [PATCH 1/3] Make it possible to relaunch workers for an existing parallel context. --- src/backend/access/transam/README.parallel | 5 ++++ src/backend/access/transam/parallel.c | 46 ++++++++++++++++++++++++++++++ src/include/access/parallel.h | 1 + 3 files changed, 52 insertions(+) diff --git a/src/backend/access/transam/README.parallel b/src/backend/access/transam/README.parallel index 1005186..dfcbafa 100644 --- a/src/backend/access/transam/README.parallel +++ b/src/backend/access/transam/README.parallel @@ -221,3 +221,8 @@ pattern looks like this: DestroyParallelContext(pcxt); ExitParallelMode(); + +If desired, after WaitForParallelWorkersToFinish() has been called, another +call to LaunchParallelWorkers() can be made using the same parallel context. +Calls to these two functions can be alternated any number of times before +destroying the parallel context. diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 29d6ed5..8363c0e 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -392,6 +392,49 @@ LaunchParallelWorkers(ParallelContext *pcxt) /* We might be running in a short-lived memory context. */ oldcontext = MemoryContextSwitchTo(TopTransactionContext); + /* + * This function can be called for a parallel context for which it has + * already been called previously, but only if all of the old workers + * have already exited. When this case arises, we need to do some extra + * reinitialization. + */ + if (pcxt->nworkers_launched > 0) + { + FixedParallelState *fps; + char *error_queue_space; + + /* Clean out old worker handles. */ + for (i = 0; i < pcxt->nworkers; ++i) + { + if (pcxt->worker[i].error_mqh != NULL) + elog(ERROR, "previously launched worker still alive"); + if (pcxt->worker[i].bgwhandle != NULL) + { + pfree(pcxt->worker[i].bgwhandle); + pcxt->worker[i].bgwhandle = NULL; + } + } + + /* Reset a few bits of fixed parallel state to a clean state. */ + fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED); + fps->workers_attached = 0; + fps->last_xlog_end = 0; + + /* Recreate error queues. */ + error_queue_space = + shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE); + for (i = 0; i < pcxt->nworkers; ++i) + { + char *start; + shm_mq *mq; + + start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE; + mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE); + shm_mq_set_receiver(mq, MyProc); + pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL); + } + } + /* Configure a worker. */ snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d", MyProcPid); @@ -416,8 +459,11 @@ LaunchParallelWorkers(ParallelContext *pcxt) if (!any_registrations_failed && RegisterDynamicBackgroundWorker(&worker, &pcxt->worker[i].bgwhandle)) + { shm_mq_set_handle(pcxt->worker[i].error_mqh, pcxt->worker[i].bgwhandle); + pcxt->nworkers_launched++; + } else { /* diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h index b029c1e..57635c8 100644 --- a/src/include/access/parallel.h +++ b/src/include/access/parallel.h @@ -35,6 +35,7 @@ typedef struct ParallelContext dlist_node node; SubTransactionId subid; int nworkers; + int nworkers_launched; parallel_worker_main_type entrypoint; char *library_name; char *function_name; -- 2.3.8 (Apple Git-58)
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers