On Fri, 18 Mar 2016 20:40:16 +0300
Dmitry Ivanov <d.iva...@postgrespro.ru> wrote:

> - Currently there's no documentation regarding parallel gin build
> feature and provided GUC variables.

> You could replace direct access to 'rd_id' field with the
> RelationGetRelid macro.

> Parameter 'r' is unused, you could remove it.

> Some of the functions and pieces of code that you've added do not
> comply to the formatting conventions, e. g.

On Fri, 18 Mar 2016 11:18:46 -0400
Robert Haas <robertmh...@gmail.com> wrote:

> Strong +1.  The pool of background workers is necessarily quite
> limited and you can't just gobble them up.  I'm not saying that it's
> absolutely essential that the leader can also participate, but saying
> that 1 active leader + 1 worker is only 2% faster than 1 passive
> leader + 2 workers is not comparing apples to apples.

On Fri, 18 Mar 2016 09:12:59 +0530
Amit Kapila <amit.kapil...@gmail.com> wrote:

> If I understand the above data correctly, then it seems to indicate
> that majority of the work is done in processing the data, so I think
> it should be better if master and worker both can work together.

Thank you for the support and feedback! Fixed all of these in the new
version. Also added new queries to 'gin' regression test, which
immediately helped me catch one silly bug. The new patch is
attached.

Regards,

Constantin S. Pan
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 7695ec1..7f2e302 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -6850,6 +6850,35 @@ dynamic_library_path = 'C:\tools\postgresql;H:\my_project\lib;$libdir'
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-gin-parallel-workers" xreflabel="gin_parallel_workers">
+      <term><varname>gin_parallel_workers</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>gin_parallel_workers</> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Upper limit of the number of parallel workers that should be used when
+        building GIN. Set to 0 to disable parallel GIN build.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry id="guc-gin-shared-mem" xreflabel="gin_shared_mem">
+      <term><varname>gin_shared_mem</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>gin_parallel_workers</> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        The size of shared memory segment for parallel GIN buiding. The segment
+        is used to transfer partial results from workers for final merging.
+        Ignored if not using parallel GIN build.
+       </para>
+      </listitem>
+     </varlistentry>
+
      </variablelist>
     </sect2>
    </sect1>
diff --git a/src/backend/access/gin/README b/src/backend/access/gin/README
index fade0cb..4325d88 100644
--- a/src/backend/access/gin/README
+++ b/src/backend/access/gin/README
@@ -49,7 +49,7 @@ Features
   * Write-Ahead Logging (WAL).  (Recoverability from crashes.)
   * User-defined opclasses.  (The scheme is similar to GiST.)
   * Optimized index creation (Makes use of maintenance_work_mem to accumulate
-    postings in memory.)
+    postings in memory and gin_parallel_workers to speed the process up.)
   * Text search support via an opclass
   * Soft upper limit on the returned results set using a GUC variable:
     gin_fuzzy_search_limit
@@ -324,6 +324,24 @@ page-deletions safe; it stamps the deleted pages with an XID and keeps the
 deleted pages around with the right-link intact until all concurrent scans
 have finished.)
 
+Parallel Building
+-----------------
+
+The most expensive part of GIN index building is accumulating the rbtree. GIN
+supports using parallel workers which divide the work between each other. This
+speeds up the accumulating stage almost perfectly, but slows down the dumping
+of the result a little, since the backend now needs to merge lists that
+correspond to the same key but come from multiple workers.
+
+When it is time to build a GIN index on a relation, the backend checks the value of
+gin_parallel_workers config variable and tries to launch the corresponding
+number of parallel workers. The backend also sets up a shared memory segment of
+configurable size (gin_shared_mem). The workers scan the relation, dividing it
+by blocks, until they fill the maintenance_work_mem, then send the accumulated
+tree to the backend through the shared memory segment. The backend merges the
+trees and inserts the entries to the index. The process repeats until the
+relation is fully scanned.
+
 Compatibility
 -------------
 
diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c
index cd21e0e..81ec5be 100644
--- a/src/backend/access/gin/gininsert.c
+++ b/src/backend/access/gin/gininsert.c
@@ -16,16 +16,69 @@
 
 #include "access/gin_private.h"
 #include "access/xloginsert.h"
+#include "access/parallel.h"
+#include "access/xact.h"
 #include "catalog/index.h"
 #include "miscadmin.h"
 #include "storage/bufmgr.h"
 #include "storage/smgr.h"
 #include "storage/indexfsm.h"
+#include "storage/spin.h"
+#include "utils/datum.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
 
+/* GUC parameter */
+int gin_parallel_workers = 0;
+int gin_shared_mem = 0;
 
-typedef struct
+#define KEY_TASK 42
+#define KEY_SHM_ORIGIN 43
+#define KEY_SHM_PER_WORKER 44
+#define GIN_BLOCKS_PER_WORKER 4
+
+/*
+ * The shmem message structure:
+ * Entry, Key, List
+ */
+
+typedef struct GinShmemEntry
+{
+	GinNullCategory category;
+	OffsetNumber attnum;
+	int nlist;
+} GinShmemEntry;
+
+typedef struct WorkerResult
+{
+	void *mq;
+	shm_mq_handle *mqh;
+	bool end_of_tree;
+	bool end_of_forest;
+
+	void *msg_body;
+	Size msg_len;
+	Datum key;
+	int skipkey;
+} WorkerResult;
+
+/*
+ * This structure describes the GIN build task for the parallel workers. We use
+ * OIDs here because workers are separate processes and pointers may become
+ * meaningless for them. The "lock" is used to protect the "scanned" and
+ * "reltuples" fields as the workers modify them.
+ */
+typedef struct GinBuildTask
+{
+	int		to_scan;
+	int		scanned;
+	slock_t	lock;
+	Oid		heap_oid;
+	Oid		index_oid;
+	double	reltuples;
+} GinBuildTask;
+
+typedef struct GinBuildState
 {
 	GinState	ginstate;
 	double		indtuples;
@@ -33,9 +86,16 @@ typedef struct
 	MemoryContext tmpCtx;
 	MemoryContext funcCtx;
 	BuildAccumulator accum;
+
+	ParallelContext *pcxt;
+	volatile GinBuildTask *task;
+	WorkerResult *results; /* NULL in workers, array in backend */
+
+	/* these only get used by workers */
+	shm_mq *mq;
+	shm_mq_handle *mqh;
 } GinBuildState;
 
-
 /*
  * Adds array of item pointers to tuple's posting list, or
  * creates posting tree and tuple pointing to tree in case
@@ -266,6 +326,359 @@ ginHeapTupleBulkInsert(GinBuildState *buildstate, OffsetNumber attnum,
 }
 
 static void
+ginDumpEntry(GinState *ginstate,
+			 shm_mq_handle *mqh,
+			 OffsetNumber attnum,
+			 Datum key,
+			 GinNullCategory category,
+			 ItemPointerData *list,
+			 int nlist)
+{
+	int keylen, listlen;
+
+	bool isnull;
+	Form_pg_attribute att;
+	GinShmemEntry e;
+
+	/*
+	 * The message consists of 2 or 3 parts. iovec allows us to send them as
+	 * one message though the parts are located at unrelated addresses.
+	 */
+	shm_mq_iovec iov[3];
+	int iovlen = 0;
+
+	char *buf = NULL;
+
+	e.category = category;
+	e.attnum = attnum;
+	e.nlist = nlist;
+
+	Assert(nlist > 0);
+
+	isnull = category == GIN_CAT_NULL_KEY;
+	att = ginstate->origTupdesc->attrs[attnum - 1];
+
+	if (e.category == GIN_CAT_NORM_KEY)
+	{
+		keylen = datumEstimateSpace(key, isnull, att->attbyval, att->attlen);
+		Assert(keylen > 0);
+		listlen = e.nlist * sizeof(ItemPointerData);
+	}
+	else
+		keylen = 0;
+
+	listlen = e.nlist * sizeof(ItemPointerData);
+
+	iov[iovlen].data = (char *)&e;
+	iov[iovlen++].len = sizeof(e);
+
+	if (keylen > 0)
+	{
+		char *cursor;
+		buf = palloc(keylen);
+		cursor = buf;
+		datumSerialize(key, isnull, att->attbyval, att->attlen, &cursor);
+		iov[iovlen].data = buf;
+		iov[iovlen++].len = keylen;
+	}
+
+	iov[iovlen].data = (char *)list;
+	iov[iovlen++].len = listlen;
+
+	if (shm_mq_sendv(mqh, iov, iovlen, false) != SHM_MQ_SUCCESS)
+		elog(ERROR,
+			 "worker %d failed to send a result entry to the backend",
+			 ParallelWorkerNumber);
+
+	if (buf)
+		pfree(buf);
+}
+
+static void
+ginSendTree(GinBuildState *buildstate)
+{
+	ItemPointerData *list;
+	Datum		key;
+	GinNullCategory category;
+	uint32		nlist;
+	OffsetNumber attnum;
+	MemoryContext oldCtx;
+
+	Assert(IsParallelWorker());
+	oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx);
+
+	ginBeginBAScan(&buildstate->accum);
+	while ((list = ginGetBAEntry(&buildstate->accum,
+							  &attnum, &key, &category, &nlist)) != NULL)
+	{
+		/* there could be many entries, so be willing to abort here */
+		CHECK_FOR_INTERRUPTS();
+
+		ginDumpEntry(&buildstate->ginstate,
+					 buildstate->mqh, attnum, key,
+					 category, list, nlist);
+	}
+
+	MemoryContextReset(buildstate->tmpCtx);
+	ginInitBA(&buildstate->accum);
+
+	/* send an empty message as an "end-of-tree" marker */
+	if (shm_mq_send(buildstate->mqh, 0, NULL, false) != SHM_MQ_SUCCESS)
+	{
+		elog(ERROR,
+			 "worker %d failed to send the end-of-tree marker to the backend",
+			 ParallelWorkerNumber);
+	}
+
+	MemoryContextSwitchTo(oldCtx);
+}
+/*
+ * Wait until all unfinished workers start dumping their trees.
+ * Return the number of trees to merge (except the backend's one).
+ */
+static int
+waitNextTree(GinBuildState *buildstate)
+{
+	int i;
+	int trees = 0;
+
+	int wnum = buildstate->pcxt ? buildstate->pcxt->nworkers_launched : 0;
+	for (i = 0; i < wnum; ++i)
+	{
+		WorkerResult *r = buildstate->results + i;
+
+		if (shm_mq_receive(r->mqh, (Size *)&r->msg_len, (void *)&r->msg_body, false) == SHM_MQ_SUCCESS)
+		{
+			r->end_of_tree = false;
+			trees++;
+		}
+		else
+		{
+			r->end_of_forest = true;
+		}
+	}
+	return trees;
+}
+
+typedef struct GinEntry
+{
+	struct GinEntry *next;
+	Datum			key;
+	GinNullCategory	category;
+	OffsetNumber	attnum;
+	ItemPointerData	*list;
+	uint32			nlist;
+} GinEntry;
+
+static bool
+getCandidate(GinBuildState *buildstate,
+			 int worker,
+			 GinEntry *backend_candidate,
+			 GinEntry *candidate)
+{
+	if (worker < 0)
+	{
+		if (!backend_candidate->list)
+			backend_candidate->list = ginGetBAEntry(&buildstate->accum,
+													&backend_candidate->attnum,
+													&backend_candidate->key, 
+													&backend_candidate->category, 
+													&backend_candidate->nlist);
+		if (!backend_candidate->list)
+			return false;
+		*candidate = *backend_candidate;
+	}
+	else
+	{
+		GinShmemEntry *shentry;
+		char *cursor;
+		WorkerResult *r = buildstate->results + worker;
+		Assert(worker >= 0);
+
+		if (r->end_of_forest || r->end_of_tree)
+			return false;
+
+		if (r->msg_len == 0) /* end-of-tree */
+		{
+			r->end_of_tree = true;
+			return false;
+		}
+
+		cursor = r->msg_body;
+		Assert(cursor != NULL);
+		shentry = (GinShmemEntry*)cursor;
+		cursor += sizeof(shentry);
+
+		if (r->skipkey)
+		{
+			/* the key has already been restored */
+			cursor += r->skipkey;
+		}
+		else
+		{
+			r->key = 0;
+			if (shentry->category == GIN_CAT_NORM_KEY)
+			{
+				bool isnull;
+				char *oldcursor = cursor;
+				r->key = datumRestore(&cursor, &isnull);
+				r->skipkey = cursor - oldcursor;
+				Assert(!isnull);
+				Assert(r->skipkey);
+			}
+		}
+		candidate->list = (ItemPointerData*)cursor;
+		candidate->nlist = shentry->nlist;
+		candidate->attnum = shentry->attnum;
+		candidate->key = r->key;
+		candidate->category = shentry->category;
+	}
+	return true;
+}
+
+/* Message consumed. Receive the next one, if not backend. */
+static void
+consumeCandidate(GinBuildState *buildstate, int worker, GinEntry *backend_candidate)
+{
+	if (worker < 0)
+		backend_candidate->list = NULL;
+	else
+	{
+		WorkerResult *r = buildstate->results + worker;
+		r->skipkey = 0;
+		if (shm_mq_receive(r->mqh, (Size *)&r->msg_len, (void *)&r->msg_body, false) != SHM_MQ_SUCCESS)
+			r->end_of_forest = true;
+	}
+}
+
+static GinEntry *
+pushEntry(GinEntry *stack)
+{
+	GinEntry *head = palloc(sizeof(GinEntry));
+	head->next = stack;
+	head->list = palloc(sizeof(ItemPointerData)); /* make ginMergeItemPointers happy */
+	head->nlist = 0;
+	return head;
+}
+
+static GinEntry *
+popEntry(GinEntry *stack)
+{
+	GinEntry *head = stack;
+	Assert(stack != NULL);
+	stack = stack->next;
+	pfree(head->list);
+	pfree(head);
+	return stack;
+}
+
+static int
+compare(GinBuildState *buildstate, GinEntry *a, GinEntry *b)
+{
+	return ginCompareAttEntries(&buildstate->ginstate,
+							    a->attnum, a->key, a->category,
+								b->attnum, b->key, b->category);
+}
+
+/* Merge the trees from all ready (but unfinished) workers (and from myself). */
+static void
+mergeTrees(GinBuildState *buildstate)
+{
+	GinEntry *entry = NULL;
+	GinEntry backend_candidate;
+	backend_candidate.list = NULL;
+
+	ginBeginBAScan(&buildstate->accum);
+
+	while (true)
+	{
+		int i;
+		bool merged = false;
+		int wnum = buildstate->pcxt ? buildstate->pcxt->nworkers_launched : 0;
+
+		/* -1 is used as the id of the backend */
+		for (i = -1; i < wnum; ++i)
+		{
+			GinEntry candidate;
+			int cmp;
+
+			if (!getCandidate(buildstate, i, &backend_candidate, &candidate))
+				continue;
+
+			cmp = -1;
+			if (entry != NULL)
+				cmp = compare(buildstate, &candidate, entry);
+
+			if (cmp > 0)
+			{
+				/* The candidate's key is greater, skip the worker. */
+				continue;
+			}
+
+			if (cmp < 0)
+			{
+				/*
+				 * The candidate's key is less than what we have on the stack.
+				 * Push a new entry onto the stack.
+				 */
+				entry = pushEntry(entry);
+				entry->key      = candidate.key;
+				entry->category = candidate.category;
+				entry->attnum   = candidate.attnum;
+			}
+
+			/* The key is less than or equal. Merge the item pointers. */
+			{
+				int newnlist;
+				ItemPointerData *oldlist = entry->list;
+				entry->list = ginMergeItemPointers(entry->list, entry->nlist,
+												   candidate.list, candidate.nlist,
+												   &newnlist);
+				entry->nlist = newnlist;
+				pfree(oldlist);
+			}
+
+			consumeCandidate(buildstate, i, &backend_candidate);
+			merged = true;
+		}
+
+		if (!merged)
+		{
+			/* Nothing merged. Insert the entry into the index and pop the stack. */
+			if (entry == NULL)
+			{
+				/* Also nothing to dump - we have finished. */
+				break;
+			}
+
+			ginEntryInsert(&buildstate->ginstate,
+						   entry->attnum, entry->key, entry->category,
+						   entry->list, entry->nlist,
+						   &buildstate->buildStats);
+			entry = popEntry(entry);
+		}
+	}
+
+	ginInitBA(&buildstate->accum);
+}
+
+static void
+ginDumpCommon(GinBuildState *buildstate, bool last)
+{
+	if (IsParallelWorker())
+		ginSendTree(buildstate);
+	else
+	{
+		int trees;
+		do
+		{
+			trees = waitNextTree(buildstate);
+			mergeTrees(buildstate);
+		} while (last && (trees > 0));
+	}
+}
+
+static void
 ginBuildCallback(Relation index, HeapTuple htup, Datum *values,
 				 bool *isnull, bool tupleIsAlive, void *state)
 {
@@ -282,53 +695,239 @@ ginBuildCallback(Relation index, HeapTuple htup, Datum *values,
 
 	/* If we've maxed out our available memory, dump everything to the index */
 	if (buildstate->accum.allocatedMemory >= (Size)maintenance_work_mem * 1024L)
-	{
-		ItemPointerData *list;
-		Datum		key;
-		GinNullCategory category;
-		uint32		nlist;
-		OffsetNumber attnum;
-
-		ginBeginBAScan(&buildstate->accum);
-		while ((list = ginGetBAEntry(&buildstate->accum,
-								  &attnum, &key, &category, &nlist)) != NULL)
-		{
-			/* there could be many entries, so be willing to abort here */
-			CHECK_FOR_INTERRUPTS();
-			ginEntryInsert(&buildstate->ginstate, attnum, key, category,
-						   list, nlist, &buildstate->buildStats);
-		}
-
-		MemoryContextReset(buildstate->tmpCtx);
-		ginInitBA(&buildstate->accum);
-	}
+		ginDumpCommon(buildstate, false);
 
 	MemoryContextSwitchTo(oldCtx);
 }
 
+/*
+ * Claim "max_blocks" or less blocks. Return the actual number of claimed
+ * blocks and set "first" to point to the first block of the claimed range.
+ * 0 return value means the task has been finished.
+ */
+static int
+claimSomeBlocks(volatile GinBuildTask *task, int max_blocks, int *first)
+{
+	int blocks = 0;
+
+	SpinLockAcquire(&task->lock);
+
+	if (task->scanned >= task->to_scan)
+	{
+		SpinLockRelease(&task->lock);
+		return 0;
+	}
+
+	*first = task->scanned;
+	blocks = max_blocks;
+	if (blocks > task->to_scan - task->scanned)
+		blocks = task->to_scan - task->scanned;
+	task->scanned += blocks;
+
+	SpinLockRelease(&task->lock);
+	return blocks;
+}
+
+static void
+reportReltuples(volatile GinBuildTask *task, double reltuples)
+{
+	SpinLockAcquire(&task->lock);
+	task->reltuples += reltuples;
+	SpinLockRelease(&task->lock);
+}
+
+static double
+ginbuildCommon(GinBuildState *buildstate, Relation heap, Relation index, IndexInfo *indexInfo)
+{
+	double reltuples = 0;
+
+	/*
+	 * create a temporary memory context that is used to hold data not yet
+	 * dumped out to the index
+	 */
+	buildstate->tmpCtx = AllocSetContextCreate(CurrentMemoryContext,
+											   "Gin build temporary context",
+											   ALLOCSET_DEFAULT_MINSIZE,
+											   ALLOCSET_DEFAULT_INITSIZE,
+											   ALLOCSET_DEFAULT_MAXSIZE);
+
+	/*
+	 * create a temporary memory context that is used for calling
+	 * ginExtractEntries(), and can be reset after each tuple
+	 */
+	buildstate->funcCtx = AllocSetContextCreate(CurrentMemoryContext,
+												"Gin build temporary context for user-defined function",
+												ALLOCSET_DEFAULT_MINSIZE,
+												ALLOCSET_DEFAULT_INITSIZE,
+												ALLOCSET_DEFAULT_MAXSIZE);
+
+	buildstate->accum.ginstate = &buildstate->ginstate;
+	ginInitBA(&buildstate->accum);
+
+	/*
+	 * Do the heap scan.  We disallow sync scan here because dataPlaceToPage
+	 * prefers to receive tuples in TID order.
+	 */
+	while (true)
+	{
+		double subtuples;
+		int first, blocks;
+
+		blocks = claimSomeBlocks(buildstate->task, GIN_BLOCKS_PER_WORKER, &first);
+		if (blocks == 0)
+			break;
+
+		subtuples = IndexBuildHeapRangeScan(heap, index, indexInfo, false, false,
+											first, blocks,
+											ginBuildCallback, (void *)buildstate);
+		reltuples += subtuples;
+	}
+
+	/* dump remaining entries to the index */
+	ginDumpCommon(buildstate, true);
+
+	MemoryContextDelete(buildstate->funcCtx);
+	MemoryContextDelete(buildstate->tmpCtx);
+
+	/*
+	 * Update metapage stats
+	 */
+	buildstate->buildStats.nTotalPages = RelationGetNumberOfBlocks(index);
+	ginUpdateStats(index, &buildstate->buildStats);
+
+	return reltuples;
+}
+
+/*
+ * The idea behind parallel GIN build is letting the parallel workers scan the
+ * relation and build rbtree in parallel. Each block is scanned by one of the
+ * workers.
+ */
+static void
+ginbuildWorker(dsm_segment *seg, shm_toc *toc)
+{
+	GinBuildState buildstate;
+
+	Relation heap;
+	Relation index;
+	IndexInfo *indexInfo;
+
+	double reltuples;
+
+	char *shm_origin;
+	int mqsize;
+
+	buildstate.task = (GinBuildTask*)shm_toc_lookup(toc, KEY_TASK);
+
+	shm_origin = (char *)shm_toc_lookup(toc, KEY_SHM_ORIGIN);
+	mqsize = *(int*)shm_toc_lookup(toc, KEY_SHM_PER_WORKER);
+	buildstate.mq = (shm_mq *)(shm_origin + ParallelWorkerNumber * mqsize);
+	shm_mq_set_sender(buildstate.mq, MyProc);
+	buildstate.mqh = shm_mq_attach(buildstate.mq, seg, NULL);
+	shm_mq_wait_for_attach(buildstate.mqh);
+
+	heap = heap_open(buildstate.task->heap_oid, NoLock);
+	index = index_open(buildstate.task->index_oid, NoLock);
+	indexInfo = BuildIndexInfo(index);
+
+	initGinState(&buildstate.ginstate, index);
+	buildstate.indtuples = 0;
+	memset(&buildstate.buildStats, 0, sizeof(GinStatsData));
+
+	reltuples = ginbuildCommon(&buildstate, heap, index, indexInfo);
+
+	index_close(index, NoLock);
+	heap_close(heap, NoLock);
+
+	reportReltuples(buildstate.task, reltuples);
+
+	shm_mq_detach(buildstate.mq);
+}
+
+static void
+initTask(volatile GinBuildTask *task, Relation heap, Relation index)
+{
+	task->to_scan = RelationGetNumberOfBlocks(heap);
+	task->scanned = 0;
+	SpinLockInit(&task->lock);
+	task->heap_oid = RelationGetRelid(heap);
+	task->index_oid = RelationGetRelid(index);
+	task->reltuples = 0;
+}
+
+static void
+launchWorkers(GinBuildState *buildstate, int wnum, Relation heap, Relation index)
+{
+	int i;
+	void *origin;
+	int *mqsize;
+
+	EnterParallelMode();
+	buildstate->pcxt = CreateParallelContext(ginbuildWorker, wnum);
+	{
+		int size = 0, keys = 0;
+		keys++; size += sizeof(GinBuildTask);
+		keys++; size += gin_shared_mem * 1024;
+		keys++; size += sizeof(int); /* for mqsize */
+
+		shm_toc_estimate_chunk(&buildstate->pcxt->estimator, size);
+		shm_toc_estimate_keys(&buildstate->pcxt->estimator, keys);
+	}
+	InitializeParallelDSM(buildstate->pcxt);
+
+	buildstate->task = (GinBuildTask*)shm_toc_allocate(buildstate->pcxt->toc, sizeof(GinBuildTask));
+	shm_toc_insert(buildstate->pcxt->toc, KEY_TASK, (GinBuildTask*)buildstate->task);
+
+	origin = shm_toc_allocate(buildstate->pcxt->toc, gin_shared_mem * 1024);
+	shm_toc_insert(buildstate->pcxt->toc, KEY_SHM_ORIGIN, origin);
+
+	mqsize = (int *)shm_toc_allocate(buildstate->pcxt->toc, sizeof(int));
+	*mqsize = gin_shared_mem * 1024 / buildstate->pcxt->nworkers;
+	shm_toc_insert(buildstate->pcxt->toc, KEY_SHM_PER_WORKER, mqsize);
+
+	initTask(buildstate->task, heap, index);
+
+	buildstate->results = palloc(buildstate->pcxt->nworkers * sizeof(WorkerResult));
+	for (i = 0; i < buildstate->pcxt->nworkers; i++)
+	{
+		WorkerResult *r = buildstate->results + i;
+		r->mq = shm_mq_create((char *)origin + i * (*mqsize), *mqsize);
+		shm_mq_set_receiver(r->mq, MyProc);
+		r->mqh = shm_mq_attach(r->mq, buildstate->pcxt->seg, NULL);
+		r->end_of_tree = false;
+		r->end_of_forest = false;
+		r->msg_body = NULL;
+		r->msg_len = 0;
+		r->skipkey = 0;
+	}
+
+	LaunchParallelWorkers(buildstate->pcxt);
+}
+
+static void
+finishWorkers(GinBuildState *buildstate, GinBuildTask *task)
+{
+	WaitForParallelWorkersToFinish(buildstate->pcxt);
+	memcpy(task, (GinBuildTask *)buildstate->task, sizeof(GinBuildTask)); /* copy the task out of the context before destroing it */
+	DestroyParallelContext(buildstate->pcxt);
+	ExitParallelMode();
+}
+
 IndexBuildResult *
 ginbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 {
+	GinBuildState buildstate;
+
 	IndexBuildResult *result;
-	double		reltuples;
-	GinBuildState buildstate;
 	Buffer		RootBuffer,
 				MetaBuffer;
-	ItemPointerData *list;
-	Datum		key;
-	GinNullCategory category;
-	uint32		nlist;
-	MemoryContext oldCtx;
-	OffsetNumber attnum;
+	double reltuples = 0;
+	int wnum = gin_parallel_workers;
 
 	if (RelationGetNumberOfBlocks(index) != 0)
 		elog(ERROR, "index \"%s\" already contains data",
 			 RelationGetRelationName(index));
 
-	initGinState(&buildstate.ginstate, index);
-	buildstate.indtuples = 0;
-	memset(&buildstate.buildStats, 0, sizeof(GinStatsData));
-
 	/* initialize the meta page */
 	MetaBuffer = GinNewBuffer(index);
 
@@ -363,60 +962,40 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 	UnlockReleaseBuffer(RootBuffer);
 	END_CRIT_SECTION();
 
+	initGinState(&buildstate.ginstate, index);
+	buildstate.indtuples = 0;
+	memset(&buildstate.buildStats, 0, sizeof(GinStatsData));
+
 	/* count the root as first entry page */
 	buildstate.buildStats.nEntryPages++;
 
-	/*
-	 * create a temporary memory context that is used to hold data not yet
-	 * dumped out to the index
-	 */
-	buildstate.tmpCtx = AllocSetContextCreate(CurrentMemoryContext,
-											  "Gin build temporary context",
-											  ALLOCSET_DEFAULT_MINSIZE,
-											  ALLOCSET_DEFAULT_INITSIZE,
-											  ALLOCSET_DEFAULT_MAXSIZE);
-
-	/*
-	 * create a temporary memory context that is used for calling
-	 * ginExtractEntries(), and can be reset after each tuple
-	 */
-	buildstate.funcCtx = AllocSetContextCreate(CurrentMemoryContext,
-					 "Gin build temporary context for user-defined function",
-											   ALLOCSET_DEFAULT_MINSIZE,
-											   ALLOCSET_DEFAULT_INITSIZE,
-											   ALLOCSET_DEFAULT_MAXSIZE);
-
-	buildstate.accum.ginstate = &buildstate.ginstate;
-	ginInitBA(&buildstate.accum);
+	if ((wnum > 0) && RelationUsesLocalBuffers(heap))
+	{
+		elog(DEBUG1, "not using parallel GIN build on temporary table %s\n", RelationGetRelationName(heap));
+		wnum = 0;
+	}
 
-	/*
-	 * Do the heap scan.  We disallow sync scan here because dataPlaceToPage
-	 * prefers to receive tuples in TID order.
-	 */
-	reltuples = IndexBuildHeapScan(heap, index, indexInfo, false,
-								   ginBuildCallback, (void *) &buildstate);
+	buildstate.pcxt = NULL;
 
-	/* dump remaining entries to the index */
-	oldCtx = MemoryContextSwitchTo(buildstate.tmpCtx);
-	ginBeginBAScan(&buildstate.accum);
-	while ((list = ginGetBAEntry(&buildstate.accum,
-								 &attnum, &key, &category, &nlist)) != NULL)
 	{
-		/* there could be many entries, so be willing to abort here */
-		CHECK_FOR_INTERRUPTS();
-		ginEntryInsert(&buildstate.ginstate, attnum, key, category,
-					   list, nlist, &buildstate.buildStats);
-	}
-	MemoryContextSwitchTo(oldCtx);
+		GinBuildTask task;
+		double backend_reltuples = 0;
 
-	MemoryContextDelete(buildstate.funcCtx);
-	MemoryContextDelete(buildstate.tmpCtx);
+		if (wnum > 0)
+		{
+			launchWorkers(&buildstate, wnum, heap, index);
+			backend_reltuples += ginbuildCommon(&buildstate, heap, index, indexInfo);
+			finishWorkers(&buildstate, &task);
+		}
+		else
+		{
+			buildstate.task = &task;
+			initTask(buildstate.task, heap, index);
+			backend_reltuples += ginbuildCommon(&buildstate, heap, index, indexInfo);
+		}
 
-	/*
-	 * Update metapage stats
-	 */
-	buildstate.buildStats.nTotalPages = RelationGetNumberOfBlocks(index);
-	ginUpdateStats(index, &buildstate.buildStats);
+		reltuples = backend_reltuples + task.reltuples;
+	}
 
 	/*
 	 * Return statistics
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index a325943..32c6ea0 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -2775,6 +2775,31 @@ static struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"gin_parallel_workers",
+			PGC_USERSET,
+			RESOURCES_ASYNCHRONOUS,
+			gettext_noop("Maximum number of parallel workers for GIN buiding."),
+			NULL,
+		},
+		&gin_parallel_workers,
+		0, 0, MAX_BACKENDS,
+		NULL, NULL, NULL
+	},
+
+	{
+		{"gin_shared_mem",
+			PGC_USERSET,
+			RESOURCES_ASYNCHRONOUS,
+			gettext_noop("The size of shared memory segment for parallel GIN buiding."),
+			NULL,
+			GUC_UNIT_KB
+		},
+		&gin_shared_mem,
+		16 * 1024, 1024, MAX_KILOBYTES,
+		NULL, NULL, NULL
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 773b4e8..321d572 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -541,6 +541,8 @@
 #xmloption = 'content'
 #gin_fuzzy_search_limit = 0
 #gin_pending_list_limit = 4MB
+#gin_parallel_workers = 0		# 0 disables parallel gin build
+#gin_shared_mem = 16MB
 
 # - Locale and Formatting -
 
diff --git a/src/include/access/gin.h b/src/include/access/gin.h
index e5b2e10..91e5b27 100644
--- a/src/include/access/gin.h
+++ b/src/include/access/gin.h
@@ -68,6 +68,8 @@ typedef char GinTernaryValue;
 /* GUC parameters */
 extern PGDLLIMPORT int GinFuzzySearchLimit;
 extern int	gin_pending_list_limit;
+extern int	gin_parallel_workers;
+extern int	gin_shared_mem;
 
 /* ginutil.c */
 extern void ginGetStats(Relation index, GinStatsData *stats);
diff --git a/src/test/regress/expected/gin.out b/src/test/regress/expected/gin.out
index a3911a6..fc6bac0 100644
--- a/src/test/regress/expected/gin.out
+++ b/src/test/regress/expected/gin.out
@@ -23,6 +23,88 @@ select gin_clean_pending_list('gin_test_idx'); -- nothing to flush
                       0
 (1 row)
 
+-- Test parallel building
+drop index gin_test_idx;
+select count(*) from gin_test_tbl where i @> array[1,2];
+ count 
+-------
+ 20002
+(1 row)
+
+select count(*) from gin_test_tbl where i @> array[2];
+ count 
+-------
+ 20002
+(1 row)
+
+select count(*) from gin_test_tbl where i @> array[8];
+ count 
+-------
+     3
+(1 row)
+
+set gin_parallel_workers = 1;
+create index gin_test_idx on gin_test_tbl using gin (i);
+select count(*) from gin_test_tbl where i @> array[1,2];
+ count 
+-------
+ 20002
+(1 row)
+
+select count(*) from gin_test_tbl where i @> array[2];
+ count 
+-------
+ 20002
+(1 row)
+
+select count(*) from gin_test_tbl where i @> array[8];
+ count 
+-------
+     3
+(1 row)
+
+drop index gin_test_idx;
+set gin_parallel_workers = 2;
+create index gin_test_idx on gin_test_tbl using gin (i);
+select count(*) from gin_test_tbl where i @> array[1,2];
+ count 
+-------
+ 20002
+(1 row)
+
+select count(*) from gin_test_tbl where i @> array[2];
+ count 
+-------
+ 20002
+(1 row)
+
+select count(*) from gin_test_tbl where i @> array[8];
+ count 
+-------
+     3
+(1 row)
+
+drop index gin_test_idx;
+set gin_parallel_workers = 8;
+create index gin_test_idx on gin_test_tbl using gin (i);
+select count(*) from gin_test_tbl where i @> array[1,2];
+ count 
+-------
+ 20002
+(1 row)
+
+select count(*) from gin_test_tbl where i @> array[2];
+ count 
+-------
+ 20002
+(1 row)
+
+select count(*) from gin_test_tbl where i @> array[8];
+ count 
+-------
+     3
+(1 row)
+
 -- Test vacuuming
 delete from gin_test_tbl where i @> array[2];
 vacuum gin_test_tbl;
diff --git a/src/test/regress/sql/gin.sql b/src/test/regress/sql/gin.sql
index c566e9b..8b0eb45 100644
--- a/src/test/regress/sql/gin.sql
+++ b/src/test/regress/sql/gin.sql
@@ -19,6 +19,33 @@ vacuum gin_test_tbl; -- flush the fastupdate buffers
 
 select gin_clean_pending_list('gin_test_idx'); -- nothing to flush
 
+-- Test parallel building
+
+drop index gin_test_idx;
+select count(*) from gin_test_tbl where i @> array[1,2];
+select count(*) from gin_test_tbl where i @> array[2];
+select count(*) from gin_test_tbl where i @> array[8];
+
+set gin_parallel_workers = 1;
+create index gin_test_idx on gin_test_tbl using gin (i);
+select count(*) from gin_test_tbl where i @> array[1,2];
+select count(*) from gin_test_tbl where i @> array[2];
+select count(*) from gin_test_tbl where i @> array[8];
+
+drop index gin_test_idx;
+set gin_parallel_workers = 2;
+create index gin_test_idx on gin_test_tbl using gin (i);
+select count(*) from gin_test_tbl where i @> array[1,2];
+select count(*) from gin_test_tbl where i @> array[2];
+select count(*) from gin_test_tbl where i @> array[8];
+
+drop index gin_test_idx;
+set gin_parallel_workers = 8;
+create index gin_test_idx on gin_test_tbl using gin (i);
+select count(*) from gin_test_tbl where i @> array[1,2];
+select count(*) from gin_test_tbl where i @> array[2];
+select count(*) from gin_test_tbl where i @> array[8];
+
 -- Test vacuuming
 delete from gin_test_tbl where i @> array[2];
 vacuum gin_test_tbl;
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to