On Fri, Feb 6, 2015 at 2:13 PM, Robert Haas <robertmh...@gmail.com> wrote: > My first comment here is that I think we should actually teach > heapam.c about parallelism.
I coded this up; see attached. I'm also attaching an updated version of the parallel count code revised to use this API. It's now called "parallel_count" rather than "parallel_dummy" and I removed some stupid stuff from it. I'm curious to see what other people think, but this seems much cleaner to me. With the old approach, the parallel-count code was duplicating some of the guts of heapam.c and dropping the rest on the floor; now it just asks for a parallel scan and away it goes. Similarly, if your parallel-seqscan patch wanted to scan block-by-block rather than splitting the relation into equal parts, or if it wanted to participate in the synchronized-seqcan stuff, there was no clean way to do that. With this approach, those decisions are - as they quite properly should be - isolated within heapam.c, rather than creeping into the executor. (These patches should be applied over parallel-mode-v4.patch.) -- Robert Haas EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
commit 096b3d5bdb4df5de095104fd3f58efa97e08a2ff Author: Robert Haas <rhaas@postgresql.org> Date: Fri Feb 6 21:19:40 2015 -0500 Support parallel heap scans. diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 50bede8..abfe8c2 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -62,6 +62,7 @@ #include "storage/predicate.h" #include "storage/procarray.h" #include "storage/smgr.h" +#include "storage/spin.h" #include "storage/standby.h" #include "utils/datum.h" #include "utils/inval.h" @@ -79,8 +80,10 @@ bool synchronize_seqscans = true; static HeapScanDesc heap_beginscan_internal(Relation relation, Snapshot snapshot, int nkeys, ScanKey key, + ParallelHeapScanDesc parallel_scan, bool allow_strat, bool allow_sync, bool is_bitmapscan, bool temp_snap); +static BlockNumber heap_parallelscan_nextpage(ParallelHeapScanDesc); static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid, CommandId cid, int options); static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf, @@ -221,7 +224,10 @@ initscan(HeapScanDesc scan, ScanKey key, bool is_rescan) * results for a non-MVCC snapshot, the caller must hold some higher-level * lock that ensures the interesting tuple(s) won't change.) */ - scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_rd); + if (scan->rs_parallel != NULL) + scan->rs_nblocks = scan->rs_parallel->phs_nblocks; + else + scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_rd); /* * If the table is large relative to NBuffers, use a bulk-read access @@ -480,7 +486,18 @@ heapgettup(HeapScanDesc scan, tuple->t_data = NULL; return; } - page = scan->rs_startblock; /* first page */ + if (scan->rs_parallel != NULL) + { + page = heap_parallelscan_nextpage(scan->rs_parallel); + if (page >= scan->rs_nblocks) + { + Assert(!BufferIsValid(scan->rs_cbuf)); + tuple->t_data = NULL; + return; + } + } + else + page = scan->rs_startblock; /* first page */ heapgetpage(scan, page); lineoff = FirstOffsetNumber; /* first offnum */ scan->rs_inited = true; @@ -503,6 +520,9 @@ heapgettup(HeapScanDesc scan, } else if (backward) { + /* backward parallel scan not supported */ + Assert(scan->rs_parallel == NULL); + if (!scan->rs_inited) { /* @@ -655,11 +675,19 @@ heapgettup(HeapScanDesc scan, } else { - page++; - if (page >= scan->rs_nblocks) - page = 0; - finished = (page == scan->rs_startblock) || - (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks <= 0 : false); + if (scan->rs_parallel != NULL) + { + page = heap_parallelscan_nextpage(scan->rs_parallel); + finished = (page >= scan->rs_nblocks); + } + else + { + page++; + if (page >= scan->rs_nblocks) + page = 0; + finished = (page == scan->rs_startblock) || + (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks <= 0 : false); + } /* * Report our new scan position for synchronization purposes. We @@ -757,7 +785,18 @@ heapgettup_pagemode(HeapScanDesc scan, tuple->t_data = NULL; return; } - page = scan->rs_startblock; /* first page */ + if (scan->rs_parallel != NULL) + { + page = heap_parallelscan_nextpage(scan->rs_parallel); + if (page >= scan->rs_nblocks) + { + Assert(!BufferIsValid(scan->rs_cbuf)); + tuple->t_data = NULL; + return; + } + } + else + page = scan->rs_startblock; /* first page */ heapgetpage(scan, page); lineindex = 0; scan->rs_inited = true; @@ -777,6 +816,9 @@ heapgettup_pagemode(HeapScanDesc scan, } else if (backward) { + /* backward parallel scan not supported */ + Assert(scan->rs_parallel == NULL); + if (!scan->rs_inited) { /* @@ -918,11 +960,19 @@ heapgettup_pagemode(HeapScanDesc scan, } else { - page++; - if (page >= scan->rs_nblocks) - page = 0; - finished = (page == scan->rs_startblock) || - (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks <= 0 : false); + if (scan->rs_parallel != NULL) + { + page = heap_parallelscan_nextpage(scan->rs_parallel); + finished = (page >= scan->rs_nblocks); + } + else + { + page++; + if (page >= scan->rs_nblocks) + page = 0; + finished = (page == scan->rs_startblock) || + (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks <= 0 : false); + } /* * Report our new scan position for synchronization purposes. We @@ -1303,7 +1353,7 @@ HeapScanDesc heap_beginscan(Relation relation, Snapshot snapshot, int nkeys, ScanKey key) { - return heap_beginscan_internal(relation, snapshot, nkeys, key, + return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL, true, true, false, false); } @@ -1313,7 +1363,7 @@ heap_beginscan_catalog(Relation relation, int nkeys, ScanKey key) Oid relid = RelationGetRelid(relation); Snapshot snapshot = RegisterSnapshot(GetCatalogSnapshot(relid)); - return heap_beginscan_internal(relation, snapshot, nkeys, key, + return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL, true, true, false, true); } @@ -1322,7 +1372,7 @@ heap_beginscan_strat(Relation relation, Snapshot snapshot, int nkeys, ScanKey key, bool allow_strat, bool allow_sync) { - return heap_beginscan_internal(relation, snapshot, nkeys, key, + return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL, allow_strat, allow_sync, false, false); } @@ -1330,13 +1380,14 @@ HeapScanDesc heap_beginscan_bm(Relation relation, Snapshot snapshot, int nkeys, ScanKey key) { - return heap_beginscan_internal(relation, snapshot, nkeys, key, + return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL, false, false, true, false); } static HeapScanDesc heap_beginscan_internal(Relation relation, Snapshot snapshot, int nkeys, ScanKey key, + ParallelHeapScanDesc parallel_scan, bool allow_strat, bool allow_sync, bool is_bitmapscan, bool temp_snap) { @@ -1364,6 +1415,7 @@ heap_beginscan_internal(Relation relation, Snapshot snapshot, scan->rs_allow_strat = allow_strat; scan->rs_allow_sync = allow_sync; scan->rs_temp_snap = temp_snap; + scan->rs_parallel = parallel_scan; /* * we can use page-at-a-time mode if it's an MVCC-safe snapshot @@ -1457,6 +1509,79 @@ heap_endscan(HeapScanDesc scan) } /* ---------------- + * heap_parallelscan_estimate - estimate storage for ParallelHeapScanDesc + * + * Sadly, this doesn't reduce to a constant, because the size required + * to serialize the snapshot can vary. + * ---------------- + */ +Size +heap_parallelscan_estimate(Snapshot snapshot) +{ + return add_size(offsetof(ParallelHeapScanDescData, phs_snapshot_data), + EstimateSnapshotSpace(snapshot)); +} + +/* ---------------- + * heap_parallelscan_initialize - initialize ParallelHeapScanDesc + * + * Must allow as many bytes of shared memory as returned by + * heap_parallelscan_estimate. Call this just once in the leader + * process; then, individual workers attach via heap_beginscan_parallel. + * ---------------- + */ +void +heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation, + Snapshot snapshot) +{ + target->phs_relid = RelationGetRelid(relation); + target->phs_nblocks = RelationGetNumberOfBlocks(relation); + SpinLockInit(&target->phs_mutex); + target->phs_cblock = 0; + SerializeSnapshot(snapshot, target->phs_snapshot_data); +} +/* ---------------- + * heap_parallelscan_nextpage - get the next page to scan + * + * A return value larger than the number of blocks to be scanned + * indicates end of scan. Note, however, that other backends could still + * be scanning if they grabbed a page to scan and aren't done with it yet. + * ---------------- + */ +static BlockNumber +heap_parallelscan_nextpage(ParallelHeapScanDesc parallel_scan) +{ + BlockNumber page = InvalidBlockNumber; + + /* we treat InvalidBlockNumber specially here to avoid overflow */ + SpinLockAcquire(¶llel_scan->phs_mutex); + if (parallel_scan->phs_cblock != InvalidBlockNumber) + page = parallel_scan->phs_cblock++; + SpinLockRelease(¶llel_scan->phs_mutex); + + return page; +} + +/* ---------------- + * heap_beginscan_parallel - join a parallel scan + * + * Caller must hold a suitable lock on the correct relation. + * ---------------- + */ +HeapScanDesc +heap_beginscan_parallel(Relation relation, ParallelHeapScanDesc parallel_scan) +{ + Snapshot snapshot; + + Assert(RelationGetRelid(relation) == parallel_scan->phs_relid); + snapshot = RestoreSnapshot(parallel_scan->phs_snapshot_data); + RegisterSnapshot(snapshot); + + return heap_beginscan_internal(relation, snapshot, 0, NULL, parallel_scan, + true, true, false, true); +} + +/* ---------------- * heap_getnext - retrieve next tuple in scan * * Fix to work with index relations. diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 939d93d..fb2b5f0 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -95,8 +95,9 @@ extern Relation heap_openrv_extended(const RangeVar *relation, #define heap_close(r,l) relation_close(r,l) -/* struct definition appears in relscan.h */ +/* struct definitions appear in relscan.h */ typedef struct HeapScanDescData *HeapScanDesc; +typedef struct ParallelHeapScanDescData *ParallelHeapScanDesc; /* * HeapScanIsValid @@ -119,6 +120,11 @@ extern void heap_rescan(HeapScanDesc scan, ScanKey key); extern void heap_endscan(HeapScanDesc scan); extern HeapTuple heap_getnext(HeapScanDesc scan, ScanDirection direction); +extern Size heap_parallelscan_estimate(Snapshot snapshot); +extern void heap_parallelscan_initialize(ParallelHeapScanDesc target, + Relation relation, Snapshot snapshot); +extern HeapScanDesc heap_beginscan_parallel(Relation, ParallelHeapScanDesc); + extern bool heap_fetch(Relation relation, Snapshot snapshot, HeapTuple tuple, Buffer *userbuf, bool keep_buf, Relation stats_relation); diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h index 9bb6362..f459020 100644 --- a/src/include/access/relscan.h +++ b/src/include/access/relscan.h @@ -20,6 +20,15 @@ #include "access/itup.h" #include "access/tupdesc.h" +/* Struct for parallel scan setup */ +typedef struct ParallelHeapScanDescData +{ + Oid phs_relid; + BlockNumber phs_nblocks; + slock_t phs_mutex; + BlockNumber phs_cblock; + char phs_snapshot_data[FLEXIBLE_ARRAY_MEMBER]; +} ParallelHeapScanDescData; typedef struct HeapScanDescData { @@ -48,6 +57,7 @@ typedef struct HeapScanDescData BlockNumber rs_cblock; /* current block # in scan, if any */ Buffer rs_cbuf; /* current buffer in scan, if any */ /* NB: if rs_cbuf is not InvalidBuffer, we hold a pin on that buffer */ + ParallelHeapScanDesc rs_parallel; /* parallel scan information */ /* these fields only used in page-at-a-time mode and for bitmap scans */ int rs_cindex; /* current tuple's index in vistuples */
commit 8d6ad4e1551252e17b7d7609f42f7a24921a2a31 Author: Robert Haas <rhaas@postgresql.org> Date: Fri Jan 30 08:39:22 2015 -0500 contrib/parallel_count, now using heap_parallel_beginscan diff --git a/contrib/parallel_count/Makefile b/contrib/parallel_count/Makefile new file mode 100644 index 0000000..221c569 --- /dev/null +++ b/contrib/parallel_count/Makefile @@ -0,0 +1,19 @@ +MODULE_big = parallel_count +OBJS = parallel_count.o $(WIN32RES) +PGFILEDESC = "parallel_count - simple parallel tuple counter" + +EXTENSION = parallel_count +DATA = parallel_count--1.0.sql + +REGRESS = parallel_count + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = contrib/parallel_count +top_builddir = ../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/contrib/parallel_count/parallel_count--1.0.sql b/contrib/parallel_count/parallel_count--1.0.sql new file mode 100644 index 0000000..a8a6266 --- /dev/null +++ b/contrib/parallel_count/parallel_count--1.0.sql @@ -0,0 +1,7 @@ +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION parallel_count" to load this file. \quit + +CREATE FUNCTION parallel_count(rel pg_catalog.regclass, + nworkers pg_catalog.int4) + RETURNS pg_catalog.int8 STRICT + AS 'MODULE_PATHNAME' LANGUAGE C; diff --git a/contrib/parallel_count/parallel_count.c b/contrib/parallel_count/parallel_count.c new file mode 100644 index 0000000..06a5ec3 --- /dev/null +++ b/contrib/parallel_count/parallel_count.c @@ -0,0 +1,154 @@ +/*-------------------------------------------------------------------------- + * + * parallel_count.c + * simple parallel tuple counter + * + * Copyright (C) 2013-2014, PostgreSQL Global Development Group + * + * IDENTIFICATION + * contrib/parallel_count/parallel_count.c + * + * ------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/heapam.h" +#include "access/parallel.h" +#include "access/relscan.h" +#include "access/xact.h" +#include "fmgr.h" +#include "miscadmin.h" +#include "storage/bufmgr.h" +#include "storage/spin.h" +#include "utils/builtins.h" +#include "utils/guc.h" +#include "utils/snapmgr.h" +#include "utils/tqual.h" + +PG_MODULE_MAGIC; + +PG_FUNCTION_INFO_V1(parallel_count); + +#define KEY_SCAN 1 +#define KEY_RESULT 2 + +void _PG_init(void); +void count_worker_main(dsm_segment *seg, shm_toc *toc); + +static void count_helper(Relation rel, ParallelHeapScanDesc scan, + int64 *result); + +Datum +parallel_count(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + int32 nworkers = PG_GETARG_INT32(1); + int32 i; + bool already_in_parallel_mode = IsInParallelMode(); + ParallelContext *pcxt; + Snapshot snapshot; + Size pscan_size; + ParallelHeapScanDesc pscan; + Relation rel; + int64 *result = NULL; + int64 total = 0; + + if (nworkers < 0) + ereport(ERROR, + (errmsg("number of parallel workers must be non-negative"))); + + rel = relation_open(relid, AccessShareLock); + + if (!already_in_parallel_mode) + EnterParallelMode(); + + pcxt = CreateParallelContextForExtension("parallel_count", + "count_worker_main", + nworkers); + + snapshot = GetActiveSnapshot(); + pscan_size = heap_parallelscan_estimate(snapshot); + shm_toc_estimate_chunk(&pcxt->estimator, pscan_size); + shm_toc_estimate_keys(&pcxt->estimator, 1); + if (nworkers > 0) + { + shm_toc_estimate_chunk(&pcxt->estimator, nworkers * sizeof(int64)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } + + InitializeParallelDSM(pcxt); + + pscan = shm_toc_allocate(pcxt->toc, pscan_size); + heap_parallelscan_initialize(pscan, rel, snapshot); + shm_toc_insert(pcxt->toc, KEY_SCAN, pscan); + if (nworkers > 0) + { + result = shm_toc_allocate(pcxt->toc, nworkers * sizeof(int64)); + shm_toc_insert(pcxt->toc, KEY_RESULT, result); + } + + LaunchParallelWorkers(pcxt); + + /* here's where we do the "real work" ... */ + count_helper(rel, pscan, &total); + + WaitForParallelWorkersToFinish(pcxt); + + for (i = 0; i < nworkers; ++i) + total += result[i]; + + DestroyParallelContext(pcxt); + + relation_close(rel, AccessShareLock); + + if (!already_in_parallel_mode) + ExitParallelMode(); + + PG_RETURN_INT64(total); +} + +void +count_worker_main(dsm_segment *seg, shm_toc *toc) +{ + ParallelHeapScanDesc pscan; + int64 *result; + Relation rel; + + pscan = shm_toc_lookup(toc, KEY_SCAN); + Assert(pscan != NULL); + + result = shm_toc_lookup(toc, KEY_RESULT); + Assert(result != NULL); + + rel = relation_open(pscan->phs_relid, AccessShareLock); + count_helper(rel, pscan, &result[ParallelWorkerNumber]); + relation_close(rel, AccessShareLock); +} + +static void +count_helper(Relation rel, ParallelHeapScanDesc pscan, int64 *result) +{ + int64 ntuples = 0; + HeapScanDesc scan; + + scan = heap_beginscan_parallel(rel, pscan); + + for (;;) + { + HeapTuple tuple; + + CHECK_FOR_INTERRUPTS(); + + tuple = heap_getnext(scan, ForwardScanDirection); + if (!HeapTupleIsValid(tuple)) + break; + + ++ntuples; + } + + heap_endscan(scan); + + *result = ntuples; + elog(NOTICE, "PID %d counted " INT64_FORMAT " tuples", MyProcPid, ntuples); +} diff --git a/contrib/parallel_count/parallel_count.control b/contrib/parallel_count/parallel_count.control new file mode 100644 index 0000000..76f332d --- /dev/null +++ b/contrib/parallel_count/parallel_count.control @@ -0,0 +1,4 @@ +comment = 'simple parallel tuple counter' +default_version = '1.0' +module_pathname = '$libdir/parallel_count' +relocatable = true
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers