On Fri, Feb 6, 2015 at 2:13 PM, Robert Haas <robertmh...@gmail.com> wrote:
> My first comment here is that I think we should actually teach
> heapam.c about parallelism.

I coded this up; see attached.  I'm also attaching an updated version
of the parallel count code revised to use this API.  It's now called
"parallel_count" rather than "parallel_dummy" and I removed some
stupid stuff from it.  I'm curious to see what other people think, but
this seems much cleaner to me.  With the old approach, the
parallel-count code was duplicating some of the guts of heapam.c and
dropping the rest on the floor; now it just asks for a parallel scan
and away it goes.  Similarly, if your parallel-seqscan patch wanted to
scan block-by-block rather than splitting the relation into equal
parts, or if it wanted to participate in the synchronized-seqcan
stuff, there was no clean way to do that.  With this approach, those
decisions are - as they quite properly should be - isolated within
heapam.c, rather than creeping into the executor.

(These patches should be applied over parallel-mode-v4.patch.)

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
commit 096b3d5bdb4df5de095104fd3f58efa97e08a2ff
Author: Robert Haas <rhaas@postgresql.org>
Date:   Fri Feb 6 21:19:40 2015 -0500

    Support parallel heap scans.

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 50bede8..abfe8c2 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -62,6 +62,7 @@
 #include "storage/predicate.h"
 #include "storage/procarray.h"
 #include "storage/smgr.h"
+#include "storage/spin.h"
 #include "storage/standby.h"
 #include "utils/datum.h"
 #include "utils/inval.h"
@@ -79,8 +80,10 @@ bool		synchronize_seqscans = true;
 static HeapScanDesc heap_beginscan_internal(Relation relation,
 						Snapshot snapshot,
 						int nkeys, ScanKey key,
+						ParallelHeapScanDesc parallel_scan,
 						bool allow_strat, bool allow_sync,
 						bool is_bitmapscan, bool temp_snap);
+static BlockNumber heap_parallelscan_nextpage(ParallelHeapScanDesc);
 static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
 					TransactionId xid, CommandId cid, int options);
 static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
@@ -221,7 +224,10 @@ initscan(HeapScanDesc scan, ScanKey key, bool is_rescan)
 	 * results for a non-MVCC snapshot, the caller must hold some higher-level
 	 * lock that ensures the interesting tuple(s) won't change.)
 	 */
-	scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_rd);
+	if (scan->rs_parallel != NULL)
+		scan->rs_nblocks = scan->rs_parallel->phs_nblocks;
+	else
+		scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_rd);
 
 	/*
 	 * If the table is large relative to NBuffers, use a bulk-read access
@@ -480,7 +486,18 @@ heapgettup(HeapScanDesc scan,
 				tuple->t_data = NULL;
 				return;
 			}
-			page = scan->rs_startblock; /* first page */
+			if (scan->rs_parallel != NULL)
+			{
+				page = heap_parallelscan_nextpage(scan->rs_parallel);
+				if (page >= scan->rs_nblocks)
+				{
+					Assert(!BufferIsValid(scan->rs_cbuf));
+					tuple->t_data = NULL;
+					return;
+				}
+			}
+			else
+				page = scan->rs_startblock; /* first page */
 			heapgetpage(scan, page);
 			lineoff = FirstOffsetNumber;		/* first offnum */
 			scan->rs_inited = true;
@@ -503,6 +520,9 @@ heapgettup(HeapScanDesc scan,
 	}
 	else if (backward)
 	{
+		/* backward parallel scan not supported */
+		Assert(scan->rs_parallel == NULL);
+
 		if (!scan->rs_inited)
 		{
 			/*
@@ -655,11 +675,19 @@ heapgettup(HeapScanDesc scan,
 		}
 		else
 		{
-			page++;
-			if (page >= scan->rs_nblocks)
-				page = 0;
-			finished = (page == scan->rs_startblock) ||
-				(scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks <= 0 : false);
+			if (scan->rs_parallel != NULL)
+			{
+				page = heap_parallelscan_nextpage(scan->rs_parallel);
+				finished = (page >= scan->rs_nblocks);
+			}
+			else
+			{
+				page++;
+				if (page >= scan->rs_nblocks)
+					page = 0;
+				finished = (page == scan->rs_startblock) ||
+					(scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks <= 0 : false);
+			}
 
 			/*
 			 * Report our new scan position for synchronization purposes. We
@@ -757,7 +785,18 @@ heapgettup_pagemode(HeapScanDesc scan,
 				tuple->t_data = NULL;
 				return;
 			}
-			page = scan->rs_startblock; /* first page */
+			if (scan->rs_parallel != NULL)
+			{
+				page = heap_parallelscan_nextpage(scan->rs_parallel);
+				if (page >= scan->rs_nblocks)
+				{
+					Assert(!BufferIsValid(scan->rs_cbuf));
+					tuple->t_data = NULL;
+					return;
+				}
+			}
+			else
+				page = scan->rs_startblock; /* first page */
 			heapgetpage(scan, page);
 			lineindex = 0;
 			scan->rs_inited = true;
@@ -777,6 +816,9 @@ heapgettup_pagemode(HeapScanDesc scan,
 	}
 	else if (backward)
 	{
+		/* backward parallel scan not supported */
+		Assert(scan->rs_parallel == NULL);
+
 		if (!scan->rs_inited)
 		{
 			/*
@@ -918,11 +960,19 @@ heapgettup_pagemode(HeapScanDesc scan,
 		}
 		else
 		{
-			page++;
-			if (page >= scan->rs_nblocks)
-				page = 0;
-			finished = (page == scan->rs_startblock) ||
-				(scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks <= 0 : false);
+			if (scan->rs_parallel != NULL)
+			{
+				page = heap_parallelscan_nextpage(scan->rs_parallel);
+				finished = (page >= scan->rs_nblocks);
+			}
+			else
+			{
+				page++;
+				if (page >= scan->rs_nblocks)
+					page = 0;
+				finished = (page == scan->rs_startblock) ||
+					(scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks <= 0 : false);
+			}
 
 			/*
 			 * Report our new scan position for synchronization purposes. We
@@ -1303,7 +1353,7 @@ HeapScanDesc
 heap_beginscan(Relation relation, Snapshot snapshot,
 			   int nkeys, ScanKey key)
 {
-	return heap_beginscan_internal(relation, snapshot, nkeys, key,
+	return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
 								   true, true, false, false);
 }
 
@@ -1313,7 +1363,7 @@ heap_beginscan_catalog(Relation relation, int nkeys, ScanKey key)
 	Oid			relid = RelationGetRelid(relation);
 	Snapshot	snapshot = RegisterSnapshot(GetCatalogSnapshot(relid));
 
-	return heap_beginscan_internal(relation, snapshot, nkeys, key,
+	return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
 								   true, true, false, true);
 }
 
@@ -1322,7 +1372,7 @@ heap_beginscan_strat(Relation relation, Snapshot snapshot,
 					 int nkeys, ScanKey key,
 					 bool allow_strat, bool allow_sync)
 {
-	return heap_beginscan_internal(relation, snapshot, nkeys, key,
+	return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
 								   allow_strat, allow_sync, false, false);
 }
 
@@ -1330,13 +1380,14 @@ HeapScanDesc
 heap_beginscan_bm(Relation relation, Snapshot snapshot,
 				  int nkeys, ScanKey key)
 {
-	return heap_beginscan_internal(relation, snapshot, nkeys, key,
+	return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
 								   false, false, true, false);
 }
 
 static HeapScanDesc
 heap_beginscan_internal(Relation relation, Snapshot snapshot,
 						int nkeys, ScanKey key,
+						ParallelHeapScanDesc parallel_scan,
 						bool allow_strat, bool allow_sync,
 						bool is_bitmapscan, bool temp_snap)
 {
@@ -1364,6 +1415,7 @@ heap_beginscan_internal(Relation relation, Snapshot snapshot,
 	scan->rs_allow_strat = allow_strat;
 	scan->rs_allow_sync = allow_sync;
 	scan->rs_temp_snap = temp_snap;
+	scan->rs_parallel = parallel_scan;
 
 	/*
 	 * we can use page-at-a-time mode if it's an MVCC-safe snapshot
@@ -1457,6 +1509,79 @@ heap_endscan(HeapScanDesc scan)
 }
 
 /* ----------------
+ *		heap_parallelscan_estimate - estimate storage for ParallelHeapScanDesc
+ *
+ *		Sadly, this doesn't reduce to a constant, because the size required
+ *		to serialize the snapshot can vary.
+ * ----------------
+ */
+Size
+heap_parallelscan_estimate(Snapshot snapshot)
+{
+	return add_size(offsetof(ParallelHeapScanDescData, phs_snapshot_data),
+					EstimateSnapshotSpace(snapshot));
+}
+
+/* ----------------
+ *		heap_parallelscan_initialize - initialize ParallelHeapScanDesc
+ *
+ *		Must allow as many bytes of shared memory as returned by
+ *		heap_parallelscan_estimate.  Call this just once in the leader
+ *		process; then, individual workers attach via heap_beginscan_parallel.
+ * ----------------
+ */
+void
+heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation,
+							 Snapshot snapshot)
+{
+	target->phs_relid = RelationGetRelid(relation);
+	target->phs_nblocks = RelationGetNumberOfBlocks(relation);
+	SpinLockInit(&target->phs_mutex);
+	target->phs_cblock = 0;
+	SerializeSnapshot(snapshot, target->phs_snapshot_data);
+}
+/* ----------------
+ *		heap_parallelscan_nextpage - get the next page to scan
+ *
+ *		A return value larger than the number of blocks to be scanned
+ *		indicates end of scan.  Note, however, that other backends could still
+ *		be scanning if they grabbed a page to scan and aren't done with it yet.
+ * ----------------
+ */
+static BlockNumber
+heap_parallelscan_nextpage(ParallelHeapScanDesc parallel_scan)
+{
+	BlockNumber	page = InvalidBlockNumber;
+
+	/* we treat InvalidBlockNumber specially here to avoid overflow */
+	SpinLockAcquire(&parallel_scan->phs_mutex);
+	if (parallel_scan->phs_cblock != InvalidBlockNumber)
+		page = parallel_scan->phs_cblock++;
+	SpinLockRelease(&parallel_scan->phs_mutex);
+
+	return page;
+}
+
+/* ----------------
+ *		heap_beginscan_parallel - join a parallel scan
+ *
+ *		Caller must hold a suitable lock on the correct relation.
+ * ----------------
+ */
+HeapScanDesc
+heap_beginscan_parallel(Relation relation, ParallelHeapScanDesc parallel_scan)
+{
+	Snapshot		snapshot;
+
+	Assert(RelationGetRelid(relation) == parallel_scan->phs_relid);
+	snapshot = RestoreSnapshot(parallel_scan->phs_snapshot_data);
+	RegisterSnapshot(snapshot);
+
+	return heap_beginscan_internal(relation, snapshot, 0, NULL, parallel_scan,
+								   true, true, false, true);
+}
+
+/* ----------------
  *		heap_getnext	- retrieve next tuple in scan
  *
  *		Fix to work with index relations.
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 939d93d..fb2b5f0 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -95,8 +95,9 @@ extern Relation heap_openrv_extended(const RangeVar *relation,
 
 #define heap_close(r,l)  relation_close(r,l)
 
-/* struct definition appears in relscan.h */
+/* struct definitions appear in relscan.h */
 typedef struct HeapScanDescData *HeapScanDesc;
+typedef struct ParallelHeapScanDescData *ParallelHeapScanDesc;
 
 /*
  * HeapScanIsValid
@@ -119,6 +120,11 @@ extern void heap_rescan(HeapScanDesc scan, ScanKey key);
 extern void heap_endscan(HeapScanDesc scan);
 extern HeapTuple heap_getnext(HeapScanDesc scan, ScanDirection direction);
 
+extern Size heap_parallelscan_estimate(Snapshot snapshot);
+extern void heap_parallelscan_initialize(ParallelHeapScanDesc target,
+							 Relation relation, Snapshot snapshot);
+extern HeapScanDesc heap_beginscan_parallel(Relation, ParallelHeapScanDesc);
+
 extern bool heap_fetch(Relation relation, Snapshot snapshot,
 		   HeapTuple tuple, Buffer *userbuf, bool keep_buf,
 		   Relation stats_relation);
diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h
index 9bb6362..f459020 100644
--- a/src/include/access/relscan.h
+++ b/src/include/access/relscan.h
@@ -20,6 +20,15 @@
 #include "access/itup.h"
 #include "access/tupdesc.h"
 
+/* Struct for parallel scan setup */
+typedef struct ParallelHeapScanDescData
+{
+	Oid			phs_relid;
+	BlockNumber	phs_nblocks;
+	slock_t		phs_mutex;
+	BlockNumber phs_cblock;
+	char		phs_snapshot_data[FLEXIBLE_ARRAY_MEMBER];
+}	ParallelHeapScanDescData;
 
 typedef struct HeapScanDescData
 {
@@ -48,6 +57,7 @@ typedef struct HeapScanDescData
 	BlockNumber rs_cblock;		/* current block # in scan, if any */
 	Buffer		rs_cbuf;		/* current buffer in scan, if any */
 	/* NB: if rs_cbuf is not InvalidBuffer, we hold a pin on that buffer */
+	ParallelHeapScanDesc rs_parallel; /* parallel scan information */
 
 	/* these fields only used in page-at-a-time mode and for bitmap scans */
 	int			rs_cindex;		/* current tuple's index in vistuples */
commit 8d6ad4e1551252e17b7d7609f42f7a24921a2a31
Author: Robert Haas <rhaas@postgresql.org>
Date:   Fri Jan 30 08:39:22 2015 -0500

    contrib/parallel_count, now using heap_parallel_beginscan

diff --git a/contrib/parallel_count/Makefile b/contrib/parallel_count/Makefile
new file mode 100644
index 0000000..221c569
--- /dev/null
+++ b/contrib/parallel_count/Makefile
@@ -0,0 +1,19 @@
+MODULE_big = parallel_count
+OBJS = parallel_count.o $(WIN32RES)
+PGFILEDESC = "parallel_count - simple parallel tuple counter"
+
+EXTENSION = parallel_count
+DATA = parallel_count--1.0.sql
+
+REGRESS = parallel_count
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = contrib/parallel_count
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/contrib/parallel_count/parallel_count--1.0.sql b/contrib/parallel_count/parallel_count--1.0.sql
new file mode 100644
index 0000000..a8a6266
--- /dev/null
+++ b/contrib/parallel_count/parallel_count--1.0.sql
@@ -0,0 +1,7 @@
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION parallel_count" to load this file. \quit
+
+CREATE FUNCTION parallel_count(rel pg_catalog.regclass,
+							  nworkers pg_catalog.int4)
+    RETURNS pg_catalog.int8 STRICT
+	AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/contrib/parallel_count/parallel_count.c b/contrib/parallel_count/parallel_count.c
new file mode 100644
index 0000000..06a5ec3
--- /dev/null
+++ b/contrib/parallel_count/parallel_count.c
@@ -0,0 +1,154 @@
+/*--------------------------------------------------------------------------
+ *
+ * parallel_count.c
+ *		simple parallel tuple counter
+ *
+ * Copyright (C) 2013-2014, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		contrib/parallel_count/parallel_count.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/heapam.h"
+#include "access/parallel.h"
+#include "access/relscan.h"
+#include "access/xact.h"
+#include "fmgr.h"
+#include "miscadmin.h"
+#include "storage/bufmgr.h"
+#include "storage/spin.h"
+#include "utils/builtins.h"
+#include "utils/guc.h"
+#include "utils/snapmgr.h"
+#include "utils/tqual.h"
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(parallel_count);
+
+#define		KEY_SCAN			1
+#define		KEY_RESULT			2
+
+void		_PG_init(void);
+void		count_worker_main(dsm_segment *seg, shm_toc *toc);
+
+static void count_helper(Relation rel, ParallelHeapScanDesc scan,
+						 int64 *result);
+
+Datum
+parallel_count(PG_FUNCTION_ARGS)
+{
+	Oid			relid = PG_GETARG_OID(0);
+	int32		nworkers = PG_GETARG_INT32(1);
+	int32		i;
+	bool		already_in_parallel_mode = IsInParallelMode();
+	ParallelContext *pcxt;
+	Snapshot	snapshot;
+	Size		pscan_size;
+	ParallelHeapScanDesc pscan;
+	Relation	rel;
+	int64	   *result = NULL;
+	int64		total = 0;
+
+	if (nworkers < 0)
+		ereport(ERROR,
+				(errmsg("number of parallel workers must be non-negative")));
+
+	rel = relation_open(relid, AccessShareLock);
+
+	if (!already_in_parallel_mode)
+		EnterParallelMode();
+
+	pcxt = CreateParallelContextForExtension("parallel_count",
+											 "count_worker_main",
+											 nworkers);
+
+	snapshot = GetActiveSnapshot();
+	pscan_size = heap_parallelscan_estimate(snapshot);
+	shm_toc_estimate_chunk(&pcxt->estimator, pscan_size);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+	if (nworkers > 0)
+	{
+		shm_toc_estimate_chunk(&pcxt->estimator, nworkers * sizeof(int64));
+		shm_toc_estimate_keys(&pcxt->estimator, 1);
+	}
+
+	InitializeParallelDSM(pcxt);
+
+	pscan = shm_toc_allocate(pcxt->toc, pscan_size);
+	heap_parallelscan_initialize(pscan, rel, snapshot);
+	shm_toc_insert(pcxt->toc, KEY_SCAN, pscan);
+	if (nworkers > 0)
+	{
+		result = shm_toc_allocate(pcxt->toc, nworkers * sizeof(int64));
+		shm_toc_insert(pcxt->toc, KEY_RESULT, result);
+	}
+
+	LaunchParallelWorkers(pcxt);
+
+	/* here's where we do the "real work" ... */
+	count_helper(rel, pscan, &total);
+
+	WaitForParallelWorkersToFinish(pcxt);
+
+	for (i = 0; i < nworkers; ++i)
+		total += result[i];
+
+	DestroyParallelContext(pcxt);
+
+	relation_close(rel, AccessShareLock);
+
+	if (!already_in_parallel_mode)
+		ExitParallelMode();
+
+	PG_RETURN_INT64(total);
+}
+
+void
+count_worker_main(dsm_segment *seg, shm_toc *toc)
+{
+	ParallelHeapScanDesc	pscan;
+	int64	   *result;
+	Relation	rel;
+
+	pscan = shm_toc_lookup(toc, KEY_SCAN);
+	Assert(pscan != NULL);
+
+	result = shm_toc_lookup(toc, KEY_RESULT);
+	Assert(result != NULL);
+
+	rel = relation_open(pscan->phs_relid, AccessShareLock);
+	count_helper(rel, pscan, &result[ParallelWorkerNumber]);
+	relation_close(rel, AccessShareLock);
+}
+
+static void
+count_helper(Relation rel, ParallelHeapScanDesc pscan, int64 *result)
+{
+	int64		ntuples = 0;
+	HeapScanDesc	scan;
+
+	scan = heap_beginscan_parallel(rel, pscan);
+
+	for (;;)
+	{
+		HeapTuple	tuple;
+
+		CHECK_FOR_INTERRUPTS();
+
+		tuple = heap_getnext(scan, ForwardScanDirection);
+		if (!HeapTupleIsValid(tuple))
+			break;
+
+		++ntuples;
+	}
+
+	heap_endscan(scan);
+
+	*result = ntuples;
+	elog(NOTICE, "PID %d counted " INT64_FORMAT " tuples", MyProcPid, ntuples);
+}
diff --git a/contrib/parallel_count/parallel_count.control b/contrib/parallel_count/parallel_count.control
new file mode 100644
index 0000000..76f332d
--- /dev/null
+++ b/contrib/parallel_count/parallel_count.control
@@ -0,0 +1,4 @@
+comment = 'simple parallel tuple counter'
+default_version = '1.0'
+module_pathname = '$libdir/parallel_count'
+relocatable = true
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to