Hi, Hackers. The task of building GIN can require lots of time and eats 100 % CPU, but we could easily make it use more than a 100 %, especially since we now have parallel workers in postgres.
The process of building GIN looks like this: 1. Accumulate a batch of index records into an rbtree in maintenance work memory. 2. Dump the batch to disk. 3. Repeat. I have a draft implementation which divides the whole process between N parallel workers, see the patch attached. Instead of a full scan of the relation, I give each worker a range of blocks to read. This speeds up the first step N times, but slows down the second one, because when multiple workers dump item pointers for the same key, each of them has to read and decode the results of the previous one. That is a huge waste, but there is an idea on how to eliminate it. When it comes to dumping the next batch, a worker does not do it independently. Instead, it (and every other worker) sends the accumulated index records to the parent (backend) in ascending key order. The backend, which receives the records from the workers through shared memory, can merge them and dump each of them once, without the need to reread the records N-1 times. In current state the implementation is just a proof of concept and it has all the configuration hardcoded, but it already works as is, though it does not speed up the build process more than 4 times on my configuration (12 CPUs). There is also a problem with temporary tables, for which the parallel mode does not work. Please leave your feedback. Regards, Constantin S. Pan Postgres Professional: http://www.postgrespro.com The Russian Postgres Company
diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c index 8bcd159..6ea8f78 100644 --- a/src/backend/access/gin/gininsert.c +++ b/src/backend/access/gin/gininsert.c @@ -16,11 +16,14 @@ #include "access/gin_private.h" #include "access/xloginsert.h" +#include "access/parallel.h" +#include "access/xact.h" #include "catalog/index.h" #include "miscadmin.h" #include "storage/bufmgr.h" #include "storage/smgr.h" #include "storage/indexfsm.h" +#include "storage/spin.h" #include "utils/memutils.h" #include "utils/rel.h" @@ -265,6 +268,10 @@ ginHeapTupleBulkInsert(GinBuildState *buildstate, OffsetNumber attnum, MemoryContextReset(buildstate->funcCtx); } +#define KEY_TASK 42 +#define GIN_MAX_WORKERS 4 +#define GIN_BLOCKS_PER_WORKER 4 + static void ginBuildCallback(Relation index, HeapTuple htup, Datum *values, bool *isnull, bool tupleIsAlive, void *state) @@ -306,17 +313,34 @@ ginBuildCallback(Relation index, HeapTuple htup, Datum *values, MemoryContextSwitchTo(oldCtx); } -Datum -ginbuild(PG_FUNCTION_ARGS) -{ - Relation heap = (Relation) PG_GETARG_POINTER(0); - Relation index = (Relation) PG_GETARG_POINTER(1); - IndexInfo *indexInfo = (IndexInfo *) PG_GETARG_POINTER(2); - IndexBuildResult *result; - double reltuples; +/* + * This structure describes the GIN build task for the parallel workers. We use + * OIDs here because workers are separate processes and pointers may become + * meaningless for them. The "lock" is used to protect the "scanned" field as + * the workers read and increase its value to distribute the task between each + * other. + */ +typedef struct { + int to_scan; + int scanned; + slock_t lock; + Oid heap_oid; + Oid index_oid; +} PGinBuildTask; + +/* + * The idea behind parallel GIN build is letting the parallel workers scan the + * relation and build rbtree in parallel. Each block is scanned by one of the + * workers. + */ +static void pginbuild(dsm_segment *seg, shm_toc *toc) { GinBuildState buildstate; - Buffer RootBuffer, - MetaBuffer; + + Relation heap; + Relation index; + IndexInfo *indexInfo; + + int reltuples = 0; ItemPointerData *list; Datum key; GinNullCategory category; @@ -324,48 +348,16 @@ ginbuild(PG_FUNCTION_ARGS) MemoryContext oldCtx; OffsetNumber attnum; - if (RelationGetNumberOfBlocks(index) != 0) - elog(ERROR, "index \"%s\" already contains data", - RelationGetRelationName(index)); + volatile PGinBuildTask *task = (PGinBuildTask*)shm_toc_lookup(toc, KEY_TASK); + + heap = heap_open(task->heap_oid, NoLock); + index = index_open(task->index_oid, NoLock); + indexInfo = BuildIndexInfo(index); initGinState(&buildstate.ginstate, index); buildstate.indtuples = 0; memset(&buildstate.buildStats, 0, sizeof(GinStatsData)); - /* initialize the meta page */ - MetaBuffer = GinNewBuffer(index); - - /* initialize the root page */ - RootBuffer = GinNewBuffer(index); - - START_CRIT_SECTION(); - GinInitMetabuffer(MetaBuffer); - MarkBufferDirty(MetaBuffer); - GinInitBuffer(RootBuffer, GIN_LEAF); - MarkBufferDirty(RootBuffer); - - if (RelationNeedsWAL(index)) - { - XLogRecPtr recptr; - Page page; - - XLogBeginInsert(); - XLogRegisterBuffer(0, MetaBuffer, REGBUF_WILL_INIT); - XLogRegisterBuffer(1, RootBuffer, REGBUF_WILL_INIT); - - recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_CREATE_INDEX); - - page = BufferGetPage(RootBuffer); - PageSetLSN(page, recptr); - - page = BufferGetPage(MetaBuffer); - PageSetLSN(page, recptr); - } - - UnlockReleaseBuffer(MetaBuffer); - UnlockReleaseBuffer(RootBuffer); - END_CRIT_SECTION(); - /* count the root as first entry page */ buildstate.buildStats.nEntryPages++; @@ -396,8 +388,27 @@ ginbuild(PG_FUNCTION_ARGS) * Do the heap scan. We disallow sync scan here because dataPlaceToPage * prefers to receive tuples in TID order. */ - reltuples = IndexBuildHeapScan(heap, index, indexInfo, false, - ginBuildCallback, (void *) &buildstate); + while (true) { + int subtuples; + int scan_from, to_scan_this_time; + SpinLockAcquire(&task->lock); + if (task->scanned >= task->to_scan) + { + SpinLockRelease(&task->lock); + break; + } + scan_from = task->scanned; + to_scan_this_time = GIN_BLOCKS_PER_WORKER; + if (to_scan_this_time > task->to_scan - task->scanned) + to_scan_this_time = task->to_scan - task->scanned; + task->scanned += to_scan_this_time; + SpinLockRelease(&task->lock); + + subtuples = IndexBuildHeapRangeScan(heap, index, indexInfo, false, false, + scan_from, to_scan_this_time, + ginBuildCallback, (void *)&buildstate); + reltuples += subtuples; + }; /* dump remaining entries to the index */ oldCtx = MemoryContextSwitchTo(buildstate.tmpCtx); @@ -421,13 +432,94 @@ ginbuild(PG_FUNCTION_ARGS) buildstate.buildStats.nTotalPages = RelationGetNumberOfBlocks(index); ginUpdateStats(index, &buildstate.buildStats); + index_close(index, NoLock); + heap_close(heap, NoLock); +} + +Datum +ginbuild(PG_FUNCTION_ARGS) +{ + Relation heap = (Relation) PG_GETARG_POINTER(0); + Relation index = (Relation) PG_GETARG_POINTER(1); + IndexBuildResult *result; + Buffer RootBuffer, + MetaBuffer; + + if (RelationGetNumberOfBlocks(index) != 0) + elog(ERROR, "index \"%s\" already contains data", + RelationGetRelationName(index)); + + /* initialize the meta page */ + MetaBuffer = GinNewBuffer(index); + + /* initialize the root page */ + RootBuffer = GinNewBuffer(index); + + START_CRIT_SECTION(); + GinInitMetabuffer(MetaBuffer); + MarkBufferDirty(MetaBuffer); + GinInitBuffer(RootBuffer, GIN_LEAF); + MarkBufferDirty(RootBuffer); + + if (RelationNeedsWAL(index)) + { + XLogRecPtr recptr; + Page page; + + XLogBeginInsert(); + XLogRegisterBuffer(0, MetaBuffer, REGBUF_WILL_INIT); + XLogRegisterBuffer(1, RootBuffer, REGBUF_WILL_INIT); + + recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_CREATE_INDEX); + + page = BufferGetPage(RootBuffer); + PageSetLSN(page, recptr); + + page = BufferGetPage(MetaBuffer); + PageSetLSN(page, recptr); + } + + UnlockReleaseBuffer(MetaBuffer); + UnlockReleaseBuffer(RootBuffer); + END_CRIT_SECTION(); + + EnterParallelMode(); + { + int size = sizeof(PGinBuildTask); + int keys = 1; + PGinBuildTask *task; + ParallelContext *pcxt; + + pcxt = CreateParallelContext(pginbuild, GIN_MAX_WORKERS); + + shm_toc_estimate_chunk(&pcxt->estimator, size); + shm_toc_estimate_keys(&pcxt->estimator, keys); + InitializeParallelDSM(pcxt); + task = (PGinBuildTask*)shm_toc_allocate(pcxt->toc, size); + shm_toc_insert(pcxt->toc, KEY_TASK, (void*)task); + SpinLockInit(&task->lock); + task->to_scan = RelationGetNumberOfBlocks(heap); + task->heap_oid = heap->rd_id; + task->index_oid = index->rd_id; + LaunchParallelWorkers(pcxt); + + WaitForParallelWorkersToFinish(pcxt); + + DestroyParallelContext(pcxt); + } + ExitParallelMode(); + /* * Return statistics */ result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult)); - result->heap_tuples = reltuples; - result->index_tuples = buildstate.indtuples; + /* + * FIXME: implement statistics aggregation from the workers. + * These numbers do not mean anything until then. + */ + result->heap_tuples = 123; + result->index_tuples = 456; PG_RETURN_POINTER(result); }
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers