Hi, Hackers.

The task of building GIN can require lots of time and eats 100 % CPU,
but we could easily make it use more than a 100 %, especially since we
now have parallel workers in postgres.

The process of building GIN looks like this:

1. Accumulate a batch of index records into an rbtree in maintenance
work memory.

2. Dump the batch to disk.

3. Repeat.

I have a draft implementation which divides the whole process between
N parallel workers, see the patch attached. Instead of a full scan of
the relation, I give each worker a range of blocks to read.

This speeds up the first step N times, but slows down the second one,
because when multiple workers dump item pointers for the same key, each
of them has to read and decode the results of the previous one. That is
a huge waste, but there is an idea on how to eliminate it.

When it comes to dumping the next batch, a worker does not do it
independently. Instead, it (and every other worker) sends the
accumulated index records to the parent (backend) in ascending key
order. The backend, which receives the records from the workers through
shared memory, can merge them and dump each of them once, without the
need to reread the records N-1 times.

In current state the implementation is just a proof of concept
and it has all the configuration hardcoded, but it already works as is,
though it does not speed up the build process more than 4 times on my
configuration (12 CPUs). There is also a problem with temporary tables,
for which the parallel mode does not work.

Please leave your feedback.

Regards,

Constantin S. Pan
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c
index 8bcd159..6ea8f78 100644
--- a/src/backend/access/gin/gininsert.c
+++ b/src/backend/access/gin/gininsert.c
@@ -16,11 +16,14 @@
 
 #include "access/gin_private.h"
 #include "access/xloginsert.h"
+#include "access/parallel.h"
+#include "access/xact.h"
 #include "catalog/index.h"
 #include "miscadmin.h"
 #include "storage/bufmgr.h"
 #include "storage/smgr.h"
 #include "storage/indexfsm.h"
+#include "storage/spin.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
 
@@ -265,6 +268,10 @@ ginHeapTupleBulkInsert(GinBuildState *buildstate, OffsetNumber attnum,
 	MemoryContextReset(buildstate->funcCtx);
 }
 
+#define KEY_TASK 42
+#define GIN_MAX_WORKERS 4
+#define GIN_BLOCKS_PER_WORKER 4
+
 static void
 ginBuildCallback(Relation index, HeapTuple htup, Datum *values,
 				 bool *isnull, bool tupleIsAlive, void *state)
@@ -306,17 +313,34 @@ ginBuildCallback(Relation index, HeapTuple htup, Datum *values,
 	MemoryContextSwitchTo(oldCtx);
 }
 
-Datum
-ginbuild(PG_FUNCTION_ARGS)
-{
-	Relation	heap = (Relation) PG_GETARG_POINTER(0);
-	Relation	index = (Relation) PG_GETARG_POINTER(1);
-	IndexInfo  *indexInfo = (IndexInfo *) PG_GETARG_POINTER(2);
-	IndexBuildResult *result;
-	double		reltuples;
+/*
+ * This structure describes the GIN build task for the parallel workers. We use
+ * OIDs here because workers are separate processes and pointers may become
+ * meaningless for them. The "lock" is used to protect the "scanned" field as
+ * the workers read and increase its value to distribute the task between each
+ * other.
+ */
+typedef struct {
+	int		to_scan;
+	int		scanned;
+	slock_t	lock;
+	Oid		heap_oid;
+	Oid		index_oid;
+} PGinBuildTask;
+
+/*
+ * The idea behind parallel GIN build is letting the parallel workers scan the
+ * relation and build rbtree in parallel. Each block is scanned by one of the
+ * workers.
+ */
+static void pginbuild(dsm_segment *seg, shm_toc *toc) {
 	GinBuildState buildstate;
-	Buffer		RootBuffer,
-				MetaBuffer;
+
+	Relation heap;
+	Relation index;
+	IndexInfo *indexInfo;
+
+	int reltuples = 0;
 	ItemPointerData *list;
 	Datum		key;
 	GinNullCategory category;
@@ -324,48 +348,16 @@ ginbuild(PG_FUNCTION_ARGS)
 	MemoryContext oldCtx;
 	OffsetNumber attnum;
 
-	if (RelationGetNumberOfBlocks(index) != 0)
-		elog(ERROR, "index \"%s\" already contains data",
-			 RelationGetRelationName(index));
+	volatile PGinBuildTask *task = (PGinBuildTask*)shm_toc_lookup(toc, KEY_TASK);
+
+	heap = heap_open(task->heap_oid, NoLock);
+	index = index_open(task->index_oid, NoLock);
+	indexInfo = BuildIndexInfo(index);
 
 	initGinState(&buildstate.ginstate, index);
 	buildstate.indtuples = 0;
 	memset(&buildstate.buildStats, 0, sizeof(GinStatsData));
 
-	/* initialize the meta page */
-	MetaBuffer = GinNewBuffer(index);
-
-	/* initialize the root page */
-	RootBuffer = GinNewBuffer(index);
-
-	START_CRIT_SECTION();
-	GinInitMetabuffer(MetaBuffer);
-	MarkBufferDirty(MetaBuffer);
-	GinInitBuffer(RootBuffer, GIN_LEAF);
-	MarkBufferDirty(RootBuffer);
-
-	if (RelationNeedsWAL(index))
-	{
-		XLogRecPtr	recptr;
-		Page		page;
-
-		XLogBeginInsert();
-		XLogRegisterBuffer(0, MetaBuffer, REGBUF_WILL_INIT);
-		XLogRegisterBuffer(1, RootBuffer, REGBUF_WILL_INIT);
-
-		recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_CREATE_INDEX);
-
-		page = BufferGetPage(RootBuffer);
-		PageSetLSN(page, recptr);
-
-		page = BufferGetPage(MetaBuffer);
-		PageSetLSN(page, recptr);
-	}
-
-	UnlockReleaseBuffer(MetaBuffer);
-	UnlockReleaseBuffer(RootBuffer);
-	END_CRIT_SECTION();
-
 	/* count the root as first entry page */
 	buildstate.buildStats.nEntryPages++;
 
@@ -396,8 +388,27 @@ ginbuild(PG_FUNCTION_ARGS)
 	 * Do the heap scan.  We disallow sync scan here because dataPlaceToPage
 	 * prefers to receive tuples in TID order.
 	 */
-	reltuples = IndexBuildHeapScan(heap, index, indexInfo, false,
-								   ginBuildCallback, (void *) &buildstate);
+	while (true) {
+		int subtuples;
+		int scan_from, to_scan_this_time;
+		SpinLockAcquire(&task->lock);
+		if (task->scanned >= task->to_scan)
+		{
+			SpinLockRelease(&task->lock);
+			break;
+		}
+		scan_from = task->scanned;
+		to_scan_this_time = GIN_BLOCKS_PER_WORKER;
+		if (to_scan_this_time > task->to_scan - task->scanned)
+			to_scan_this_time = task->to_scan - task->scanned;
+		task->scanned += to_scan_this_time;
+		SpinLockRelease(&task->lock);
+
+		subtuples = IndexBuildHeapRangeScan(heap, index, indexInfo, false, false,
+											scan_from, to_scan_this_time,
+											ginBuildCallback, (void *)&buildstate);
+		reltuples += subtuples;
+	};
 
 	/* dump remaining entries to the index */
 	oldCtx = MemoryContextSwitchTo(buildstate.tmpCtx);
@@ -421,13 +432,94 @@ ginbuild(PG_FUNCTION_ARGS)
 	buildstate.buildStats.nTotalPages = RelationGetNumberOfBlocks(index);
 	ginUpdateStats(index, &buildstate.buildStats);
 
+	index_close(index, NoLock);
+	heap_close(heap, NoLock);
+}
+
+Datum
+ginbuild(PG_FUNCTION_ARGS)
+{
+	Relation	heap = (Relation) PG_GETARG_POINTER(0);
+	Relation	index = (Relation) PG_GETARG_POINTER(1);
+	IndexBuildResult *result;
+	Buffer		RootBuffer,
+				MetaBuffer;
+
+	if (RelationGetNumberOfBlocks(index) != 0)
+		elog(ERROR, "index \"%s\" already contains data",
+			 RelationGetRelationName(index));
+
+	/* initialize the meta page */
+	MetaBuffer = GinNewBuffer(index);
+
+	/* initialize the root page */
+	RootBuffer = GinNewBuffer(index);
+
+	START_CRIT_SECTION();
+	GinInitMetabuffer(MetaBuffer);
+	MarkBufferDirty(MetaBuffer);
+	GinInitBuffer(RootBuffer, GIN_LEAF);
+	MarkBufferDirty(RootBuffer);
+
+	if (RelationNeedsWAL(index))
+	{
+		XLogRecPtr	recptr;
+		Page		page;
+
+		XLogBeginInsert();
+		XLogRegisterBuffer(0, MetaBuffer, REGBUF_WILL_INIT);
+		XLogRegisterBuffer(1, RootBuffer, REGBUF_WILL_INIT);
+
+		recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_CREATE_INDEX);
+
+		page = BufferGetPage(RootBuffer);
+		PageSetLSN(page, recptr);
+
+		page = BufferGetPage(MetaBuffer);
+		PageSetLSN(page, recptr);
+	}
+
+	UnlockReleaseBuffer(MetaBuffer);
+	UnlockReleaseBuffer(RootBuffer);
+	END_CRIT_SECTION();
+
+	EnterParallelMode();
+	{
+		int size = sizeof(PGinBuildTask);
+		int keys = 1;
+		PGinBuildTask   *task;
+		ParallelContext *pcxt;
+
+		pcxt = CreateParallelContext(pginbuild, GIN_MAX_WORKERS);
+
+		shm_toc_estimate_chunk(&pcxt->estimator, size);
+		shm_toc_estimate_keys(&pcxt->estimator, keys);
+		InitializeParallelDSM(pcxt);
+		task = (PGinBuildTask*)shm_toc_allocate(pcxt->toc, size);
+		shm_toc_insert(pcxt->toc, KEY_TASK, (void*)task);
+		SpinLockInit(&task->lock);
+		task->to_scan = RelationGetNumberOfBlocks(heap);
+		task->heap_oid = heap->rd_id;
+		task->index_oid = index->rd_id;
+		LaunchParallelWorkers(pcxt);
+
+		WaitForParallelWorkersToFinish(pcxt);
+
+		DestroyParallelContext(pcxt);
+	}
+	ExitParallelMode();
+
 	/*
 	 * Return statistics
 	 */
 	result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
 
-	result->heap_tuples = reltuples;
-	result->index_tuples = buildstate.indtuples;
+	/*
+	 * FIXME: implement statistics aggregation from the workers.
+	 * These numbers do not mean anything until then.
+	 */
+	result->heap_tuples = 123;
+	result->index_tuples = 456;
 
 	PG_RETURN_POINTER(result);
 }
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to