Here is a new version of the patch, which: 1. Fixes some minor stylistic issues.
2. Uses binaryheap (instead of a custom ugly stack) for merging. Regards, Constantin S. Pan Postgres Professional: http://www.postgrespro.com The Russian Postgres Company
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index d48a13f..c9ff58d 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -6844,6 +6844,35 @@ dynamic_library_path = 'C:\tools\postgresql;H:\my_project\lib;$libdir' </listitem> </varlistentry> + <varlistentry id="guc-gin-parallel-workers" xreflabel="gin_parallel_workers"> + <term><varname>gin_parallel_workers</varname> (<type>integer</type>) + <indexterm> + <primary><varname>gin_parallel_workers</> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + Upper limit of the number of parallel workers that should be used when + building GIN. Set to 0 to disable parallel GIN build. + </para> + </listitem> + </varlistentry> + + <varlistentry id="guc-gin-shared-mem" xreflabel="gin_shared_mem"> + <term><varname>gin_shared_mem</varname> (<type>integer</type>) + <indexterm> + <primary><varname>gin_parallel_workers</> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + The size of shared memory segment for parallel GIN buiding. The segment + is used to transfer partial results from workers for final merging. + Ignored if not using parallel GIN build. + </para> + </listitem> + </varlistentry> + </variablelist> </sect2> </sect1> diff --git a/src/backend/access/gin/README b/src/backend/access/gin/README index fade0cb..4325d88 100644 --- a/src/backend/access/gin/README +++ b/src/backend/access/gin/README @@ -49,7 +49,7 @@ Features * Write-Ahead Logging (WAL). (Recoverability from crashes.) * User-defined opclasses. (The scheme is similar to GiST.) * Optimized index creation (Makes use of maintenance_work_mem to accumulate - postings in memory.) + postings in memory and gin_parallel_workers to speed the process up.) * Text search support via an opclass * Soft upper limit on the returned results set using a GUC variable: gin_fuzzy_search_limit @@ -324,6 +324,24 @@ page-deletions safe; it stamps the deleted pages with an XID and keeps the deleted pages around with the right-link intact until all concurrent scans have finished.) +Parallel Building +----------------- + +The most expensive part of GIN index building is accumulating the rbtree. GIN +supports using parallel workers which divide the work between each other. This +speeds up the accumulating stage almost perfectly, but slows down the dumping +of the result a little, since the backend now needs to merge lists that +correspond to the same key but come from multiple workers. + +When it is time to build a GIN index on a relation, the backend checks the value of +gin_parallel_workers config variable and tries to launch the corresponding +number of parallel workers. The backend also sets up a shared memory segment of +configurable size (gin_shared_mem). The workers scan the relation, dividing it +by blocks, until they fill the maintenance_work_mem, then send the accumulated +tree to the backend through the shared memory segment. The backend merges the +trees and inserts the entries to the index. The process repeats until the +relation is fully scanned. + Compatibility ------------- diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c index cd21e0e..d5cbf6c 100644 --- a/src/backend/access/gin/gininsert.c +++ b/src/backend/access/gin/gininsert.c @@ -16,26 +16,95 @@ #include "access/gin_private.h" #include "access/xloginsert.h" +#include "access/parallel.h" +#include "access/xact.h" #include "catalog/index.h" #include "miscadmin.h" #include "storage/bufmgr.h" #include "storage/smgr.h" #include "storage/indexfsm.h" +#include "storage/spin.h" +#include "utils/datum.h" #include "utils/memutils.h" #include "utils/rel.h" +#include "lib/binaryheap.h" +/* GUC parameter */ +int gin_parallel_workers = 0; +int gin_shared_mem = 0; -typedef struct +#define KEY_TASK 42 +#define KEY_SHM_ORIGIN 43 +#define KEY_SHM_PER_WORKER 44 +#define GIN_BLOCKS_PER_WORKER 4 + +/* + * The results are passed through the shared message queue as a sequence of + * messages of the following structure: GinShmemEntry, Key, List + */ +typedef struct GinShmemEntry +{ + GinNullCategory category; + OffsetNumber attnum; + int nlist; +} GinShmemEntry; + +/* + * This structure is used by the backend to track the current state of the + * workers. + */ +typedef struct WorkerResult +{ + void *mq; + shm_mq_handle *mqh; + bool end_of_tree; + bool end_of_forest; +} WorkerResult; + +typedef struct GinEntry +{ + int worker; + Datum key; + GinNullCategory category; + OffsetNumber attnum; + ItemPointerData *list; + uint32 nlist; +} GinEntry; + +/* + * This shared structure describes the GIN build task for the parallel workers. + * We use OIDs here because workers are separate processes and pointers may + * become meaningless for them. The "lock" is used to protect the "scanned" and + * "reltuples" fields as the workers modify them. + */ +typedef struct GinBuildTask +{ + int to_scan; + int scanned; + slock_t lock; + Oid heap_oid; + Oid index_oid; + double reltuples; +} GinBuildTask; + +typedef struct GinBuildState { - GinState ginstate; - double indtuples; - GinStatsData buildStats; - MemoryContext tmpCtx; - MemoryContext funcCtx; + GinState ginstate; + double indtuples; + GinStatsData buildStats; + MemoryContext tmpCtx; + MemoryContext funcCtx; BuildAccumulator accum; + + ParallelContext *pcxt; + volatile GinBuildTask *task; + WorkerResult *results; /* NULL in workers, array in backend */ + + /* these only get used by workers */ + shm_mq *mq; + shm_mq_handle *mqh; } GinBuildState; - /* * Adds array of item pointers to tuple's posting list, or * creates posting tree and tuple pointing to tree in case @@ -265,6 +334,334 @@ ginHeapTupleBulkInsert(GinBuildState *buildstate, OffsetNumber attnum, MemoryContextReset(buildstate->funcCtx); } +/* + * Dump one GIN entry from the worker to the backend (through the shared + * message queue). + */ +static void +ginDumpEntry(GinState *ginstate, + shm_mq_handle *mqh, + OffsetNumber attnum, + Datum key, + GinNullCategory category, + ItemPointerData *list, + int nlist) +{ + int keylen, listlen; + + bool isnull; + Form_pg_attribute att; + GinShmemEntry e; + + /* + * The message consists of 2 or 3 parts. The IO vector allows us to send + * them as one message though the parts are located at unrelated addresses. + */ + shm_mq_iovec iov[3]; + int iovlen = 0; + + char *buf = NULL; + + e.category = category; + e.attnum = attnum; + e.nlist = nlist; + + Assert(nlist > 0); + + isnull = category == GIN_CAT_NULL_KEY; + att = ginstate->origTupdesc->attrs[attnum - 1]; + + if (e.category == GIN_CAT_NORM_KEY) + { + keylen = datumEstimateSpace(key, isnull, att->attbyval, att->attlen); + Assert(keylen > 0); + listlen = e.nlist * sizeof(ItemPointerData); + } + else + { + keylen = 0; + } + + listlen = e.nlist * sizeof(ItemPointerData); + + iov[iovlen].data = (char *)&e; + iov[iovlen++].len = sizeof(e); + + if (keylen > 0) + { + char *cursor; + buf = palloc(keylen); + cursor = buf; + datumSerialize(key, isnull, att->attbyval, att->attlen, &cursor); + iov[iovlen].data = buf; + iov[iovlen++].len = keylen; + } + + iov[iovlen].data = (char *)list; + iov[iovlen++].len = listlen; + + if (shm_mq_sendv(mqh, iov, iovlen, false) != SHM_MQ_SUCCESS) + elog(ERROR, + "worker %d failed to send a result entry to the backend", + ParallelWorkerNumber); + + if (buf) + pfree(buf); +} + +/* + * Send the accumulated tree from the worker to the backend (through the shared + * message queue). + */ +static void +ginSendTree(GinBuildState *buildstate) +{ + ItemPointerData *list; + Datum key; + GinNullCategory category; + uint32 nlist; + OffsetNumber attnum; + MemoryContext oldCtx; + + Assert(IsParallelWorker()); + oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx); + + ginBeginBAScan(&buildstate->accum); + while ((list = ginGetBAEntry(&buildstate->accum, + &attnum, &key, &category, &nlist)) != NULL) + { + /* there could be many entries, so be willing to abort here */ + CHECK_FOR_INTERRUPTS(); + + ginDumpEntry(&buildstate->ginstate, + buildstate->mqh, attnum, key, + category, list, nlist); + } + + MemoryContextReset(buildstate->tmpCtx); + ginInitBA(&buildstate->accum); + + /* send an empty message as an "end-of-tree" marker */ + if (shm_mq_send(buildstate->mqh, 0, NULL, false) != SHM_MQ_SUCCESS) + { + elog(ERROR, + "worker %d failed to send the end-of-tree marker to the backend", + ParallelWorkerNumber); + } + + MemoryContextSwitchTo(oldCtx); +} + +/* + * Resets the 'end-of-tree' marker for all workers. + */ +static void +resetEndOfTree(GinBuildState *buildstate) +{ + int i; + int wnum = buildstate->pcxt ? buildstate->pcxt->nworkers_launched : 0; + for (i = 0; i < wnum; ++i) + { + WorkerResult *r = buildstate->results + i; + r->end_of_tree = false; + } +} + +/* + * Get the next entry from the message queue (or from the rbtree directly, if + * backend, i.e. worker < 0). + */ +static GinEntry * +getNextEntry(GinBuildState *buildstate, int worker) +{ + GinEntry e, *ecopy; + e.worker = worker; + + if (worker < 0) + { + e.list = ginGetBAEntry(&buildstate->accum, + &e.attnum, + &e.key, + &e.category, + &e.nlist); + if (!e.list) + return NULL; + } + else + { + GinShmemEntry *shentry; + char *cursor; + Size msglen; + void *msg; + WorkerResult *r = buildstate->results + worker; + + if (r->end_of_forest || r->end_of_tree) + return NULL; + + if (shm_mq_receive(r->mqh, &msglen, &msg, false) != SHM_MQ_SUCCESS) + { + r->end_of_forest = true; + return NULL; + } + + if (msglen == 0) /* end-of-tree */ + { + r->end_of_tree = true; + return NULL; + } + + cursor = msg; + Assert(cursor != NULL); + shentry = (GinShmemEntry*)cursor; + cursor += sizeof(shentry); + + e.key = 0; + if (shentry->category == GIN_CAT_NORM_KEY) + { + bool isnull; + e.key = datumRestore(&cursor, &isnull); + Assert(!isnull); + } + e.nlist = shentry->nlist; + e.list = palloc(sizeof(ItemPointerData) * e.nlist); + memcpy(e.list, cursor, sizeof(ItemPointerData) * e.nlist); + e.attnum = shentry->attnum; + e.category = shentry->category; + } + + ecopy = palloc(sizeof(e)); + memcpy(ecopy, &e, sizeof(e)); + return ecopy; +} + +static int +compare(GinBuildState *buildstate, GinEntry *a, GinEntry *b) +{ + return ginCompareAttEntries(&buildstate->ginstate, + a->attnum, a->key, a->category, + b->attnum, b->key, b->category); +} + +static int binheap_compare(Datum a, Datum b, void *arg) +{ + return -compare((GinBuildState *)arg, (GinEntry *)a, (GinEntry *)b); +} + +/* + * Merge the trees from all ready (but unfinished) workers (and from myself). + * Return the number of entries inserted. + */ +static int +mergeTrees(GinBuildState *buildstate) +{ + GinEntry *minentry = NULL; + binaryheap *binheap = NULL; + int i; + int inserted = 0; + int wnum = buildstate->pcxt ? buildstate->pcxt->nworkers_launched : 0; + + ginBeginBAScan(&buildstate->accum); + + binheap = binaryheap_allocate(wnum + 1, binheap_compare, buildstate); + + /* Populate the binheap */ + for (i = -1; i < wnum; i++) + { + GinEntry *e = getNextEntry(buildstate, i); + if (e) + binaryheap_add(binheap, PointerGetDatum(e)); + } + + while (!binaryheap_empty(binheap)) + { + GinEntry *candidate = (GinEntry *)DatumGetPointer(binaryheap_remove_first(binheap)); + + { + /* refill from the same message queue */ + GinEntry *e; + e = getNextEntry(buildstate, candidate->worker); + if (e) + binaryheap_add(binheap, PointerGetDatum(e)); + } + + if (minentry) + { + int cmp = compare(buildstate, candidate, minentry); + if (cmp) + { + /* Merge finished, insert the entry into the index. */ + Assert(cmp > 0); + ginEntryInsert(&buildstate->ginstate, + minentry->attnum, minentry->key, minentry->category, + minentry->list, minentry->nlist, + &buildstate->buildStats); + inserted++; + pfree(minentry->list); + pfree(minentry); + minentry = candidate; + } else { + /* Merge the candidate with minentry. */ + int newnlist; + + ItemPointerData *oldlist = minentry->list; + minentry->list = ginMergeItemPointers(minentry->list, minentry->nlist, + candidate->list, candidate->nlist, + &newnlist); + minentry->nlist = newnlist; + pfree(candidate->list); + pfree(candidate); + pfree(oldlist); + } + } + else + { + minentry = candidate; + } + + if (minentry) + { + ginEntryInsert(&buildstate->ginstate, + minentry->attnum, minentry->key, minentry->category, + minentry->list, minentry->nlist, + &buildstate->buildStats); + inserted++; + } + } + + Assert(binaryheap_empty(binheap)); + + binaryheap_free(binheap); + ginInitBA(&buildstate->accum); + return inserted; +} + +/* + * The common function used by both backend and worker to dump the accumulator + * when it gets full. 'last' should be true on the last call, to make the + * backend merge everything from the workers even if the backend has no more + * results from itself. + */ +static void +ginDumpCommon(GinBuildState *buildstate, bool last) +{ + if (IsParallelWorker()) + ginSendTree(buildstate); + else + { + int inserted; + do + { + resetEndOfTree(buildstate); + inserted = mergeTrees(buildstate); + } while (last && (inserted > 0)); + /* + * If it is not the 'last' dump, then the backend can merge the next + * incoming trees on the next dump. But if it is the 'last' dump, the + * backend has to repeat merging until all workers are finished. + */ + } +} + static void ginBuildCallback(Relation index, HeapTuple htup, Datum *values, bool *isnull, bool tupleIsAlive, void *state) @@ -282,53 +679,244 @@ ginBuildCallback(Relation index, HeapTuple htup, Datum *values, /* If we've maxed out our available memory, dump everything to the index */ if (buildstate->accum.allocatedMemory >= (Size)maintenance_work_mem * 1024L) - { - ItemPointerData *list; - Datum key; - GinNullCategory category; - uint32 nlist; - OffsetNumber attnum; - - ginBeginBAScan(&buildstate->accum); - while ((list = ginGetBAEntry(&buildstate->accum, - &attnum, &key, &category, &nlist)) != NULL) - { - /* there could be many entries, so be willing to abort here */ - CHECK_FOR_INTERRUPTS(); - ginEntryInsert(&buildstate->ginstate, attnum, key, category, - list, nlist, &buildstate->buildStats); - } - - MemoryContextReset(buildstate->tmpCtx); - ginInitBA(&buildstate->accum); - } + ginDumpCommon(buildstate, false); MemoryContextSwitchTo(oldCtx); } +/* + * Claim "max_blocks" or less blocks. Return the actual number of claimed + * blocks and set "first" to point to the first block of the claimed range. + * 0 return value means the task has been finished. + */ +static int +claimSomeBlocks(volatile GinBuildTask *task, int max_blocks, int *first) +{ + int blocks = 0; + + SpinLockAcquire(&task->lock); + + if (task->scanned >= task->to_scan) + { + SpinLockRelease(&task->lock); + return 0; + } + + *first = task->scanned; + blocks = max_blocks; + if (blocks > task->to_scan - task->scanned) + blocks = task->to_scan - task->scanned; + task->scanned += blocks; + + SpinLockRelease(&task->lock); + return blocks; +} + +static void +reportReltuples(volatile GinBuildTask *task, double reltuples) +{ + SpinLockAcquire(&task->lock); + task->reltuples += reltuples; + SpinLockRelease(&task->lock); +} + +static double +ginbuildCommon(GinBuildState *buildstate, Relation heap, Relation index, IndexInfo *indexInfo) +{ + double reltuples = 0; + + /* + * create a temporary memory context that is used to hold data not yet + * dumped out to the index + */ + buildstate->tmpCtx = AllocSetContextCreate(CurrentMemoryContext, + "Gin build temporary context", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + /* + * create a temporary memory context that is used for calling + * ginExtractEntries(), and can be reset after each tuple + */ + buildstate->funcCtx = AllocSetContextCreate(CurrentMemoryContext, + "Gin build temporary context for user-defined function", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + buildstate->accum.ginstate = &buildstate->ginstate; + ginInitBA(&buildstate->accum); + + /* + * Do the heap scan. We disallow sync scan here because dataPlaceToPage + * prefers to receive tuples in TID order. + */ + while (true) + { + double subtuples; + int first, blocks; + + blocks = claimSomeBlocks(buildstate->task, GIN_BLOCKS_PER_WORKER, &first); + if (blocks == 0) + break; + + subtuples = IndexBuildHeapRangeScan(heap, index, indexInfo, false, false, + first, blocks, + ginBuildCallback, (void *)buildstate); + reltuples += subtuples; + } + + /* dump remaining entries to the index */ + ginDumpCommon(buildstate, true); + + MemoryContextDelete(buildstate->funcCtx); + MemoryContextDelete(buildstate->tmpCtx); + + /* + * Update metapage stats + */ + buildstate->buildStats.nTotalPages = RelationGetNumberOfBlocks(index); + ginUpdateStats(index, &buildstate->buildStats); + + return reltuples; +} + +/* + * The idea behind parallel GIN build is letting the parallel workers scan the + * relation and build rbtree in parallel. Each block is scanned by one of the + * workers. + */ +static void +ginbuildWorker(dsm_segment *seg, shm_toc *toc) +{ + GinBuildState buildstate; + + Relation heap; + Relation index; + IndexInfo *indexInfo; + + double reltuples; + + char *shm_origin; + int mqsize; + + buildstate.task = (GinBuildTask*)shm_toc_lookup(toc, KEY_TASK); + + shm_origin = (char *)shm_toc_lookup(toc, KEY_SHM_ORIGIN); + mqsize = *(int*)shm_toc_lookup(toc, KEY_SHM_PER_WORKER); + buildstate.mq = (shm_mq *)(shm_origin + ParallelWorkerNumber * mqsize); + shm_mq_set_sender(buildstate.mq, MyProc); + buildstate.mqh = shm_mq_attach(buildstate.mq, seg, NULL); + shm_mq_wait_for_attach(buildstate.mqh); + + /* + * NoLock here because the backend has already opened the heap and the + * index. We reopen them in the worker, because we cannot just pass the + * Relation pointers through the shared memory, thus we rely on OIDs. + */ + heap = heap_open(buildstate.task->heap_oid, NoLock); + index = index_open(buildstate.task->index_oid, NoLock); + indexInfo = BuildIndexInfo(index); + + initGinState(&buildstate.ginstate, index); + buildstate.indtuples = 0; + memset(&buildstate.buildStats, 0, sizeof(GinStatsData)); + + reltuples = ginbuildCommon(&buildstate, heap, index, indexInfo); + + index_close(index, NoLock); + heap_close(heap, NoLock); + + reportReltuples(buildstate.task, reltuples); + + shm_mq_detach(buildstate.mq); +} + +/* Initialize the parallel gin building task in the shared memory. */ +static void +initTask(volatile GinBuildTask *task, Relation heap, Relation index) +{ + task->to_scan = RelationGetNumberOfBlocks(heap); + task->scanned = 0; + SpinLockInit(&task->lock); + task->heap_oid = RelationGetRelid(heap); + task->index_oid = RelationGetRelid(index); + task->reltuples = 0; +} + +/* Launch 'wnum' parallel workers to build 'index' on 'heap'. */ +static void +launchWorkers(GinBuildState *buildstate, int wnum, Relation heap, Relation index) +{ + int i; + void *origin; + int *mqsize; + + EnterParallelMode(); + buildstate->pcxt = CreateParallelContext(ginbuildWorker, wnum); + { + int size = 0, keys = 0; + keys++; size += sizeof(GinBuildTask); + keys++; size += gin_shared_mem * 1024; + keys++; size += sizeof(int); /* for mqsize */ + + shm_toc_estimate_chunk(&buildstate->pcxt->estimator, size); + shm_toc_estimate_keys(&buildstate->pcxt->estimator, keys); + } + InitializeParallelDSM(buildstate->pcxt); + + buildstate->task = (GinBuildTask*)shm_toc_allocate(buildstate->pcxt->toc, sizeof(GinBuildTask)); + shm_toc_insert(buildstate->pcxt->toc, KEY_TASK, (GinBuildTask*)buildstate->task); + + origin = shm_toc_allocate(buildstate->pcxt->toc, gin_shared_mem * 1024); + shm_toc_insert(buildstate->pcxt->toc, KEY_SHM_ORIGIN, origin); + + mqsize = (int *)shm_toc_allocate(buildstate->pcxt->toc, sizeof(int)); + *mqsize = gin_shared_mem * 1024 / buildstate->pcxt->nworkers; + shm_toc_insert(buildstate->pcxt->toc, KEY_SHM_PER_WORKER, mqsize); + + initTask(buildstate->task, heap, index); + + buildstate->results = palloc(buildstate->pcxt->nworkers * sizeof(WorkerResult)); + for (i = 0; i < buildstate->pcxt->nworkers; i++) + { + WorkerResult *r = buildstate->results + i; + r->mq = shm_mq_create((char *)origin + i * (*mqsize), *mqsize); + shm_mq_set_receiver(r->mq, MyProc); + r->mqh = shm_mq_attach(r->mq, buildstate->pcxt->seg, NULL); + r->end_of_tree = false; + r->end_of_forest = false; + } + + LaunchParallelWorkers(buildstate->pcxt); +} + +static void +finishWorkers(GinBuildState *buildstate, GinBuildTask *task) +{ + WaitForParallelWorkersToFinish(buildstate->pcxt); + /* copy the task out of the context before destroing it */ + memcpy(task, (GinBuildTask *)buildstate->task, sizeof(GinBuildTask)); + DestroyParallelContext(buildstate->pcxt); + ExitParallelMode(); +} + IndexBuildResult * ginbuild(Relation heap, Relation index, IndexInfo *indexInfo) { + GinBuildState buildstate; + IndexBuildResult *result; - double reltuples; - GinBuildState buildstate; Buffer RootBuffer, MetaBuffer; - ItemPointerData *list; - Datum key; - GinNullCategory category; - uint32 nlist; - MemoryContext oldCtx; - OffsetNumber attnum; + double reltuples = 0; + int wnum = gin_parallel_workers; if (RelationGetNumberOfBlocks(index) != 0) elog(ERROR, "index \"%s\" already contains data", RelationGetRelationName(index)); - initGinState(&buildstate.ginstate, index); - buildstate.indtuples = 0; - memset(&buildstate.buildStats, 0, sizeof(GinStatsData)); - /* initialize the meta page */ MetaBuffer = GinNewBuffer(index); @@ -363,60 +951,40 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo) UnlockReleaseBuffer(RootBuffer); END_CRIT_SECTION(); + initGinState(&buildstate.ginstate, index); + buildstate.indtuples = 0; + memset(&buildstate.buildStats, 0, sizeof(GinStatsData)); + /* count the root as first entry page */ buildstate.buildStats.nEntryPages++; - /* - * create a temporary memory context that is used to hold data not yet - * dumped out to the index - */ - buildstate.tmpCtx = AllocSetContextCreate(CurrentMemoryContext, - "Gin build temporary context", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); - - /* - * create a temporary memory context that is used for calling - * ginExtractEntries(), and can be reset after each tuple - */ - buildstate.funcCtx = AllocSetContextCreate(CurrentMemoryContext, - "Gin build temporary context for user-defined function", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); - - buildstate.accum.ginstate = &buildstate.ginstate; - ginInitBA(&buildstate.accum); + if ((wnum > 0) && RelationUsesLocalBuffers(heap)) + { + elog(DEBUG1, "not using parallel GIN build on temporary table %s\n", RelationGetRelationName(heap)); + wnum = 0; + } - /* - * Do the heap scan. We disallow sync scan here because dataPlaceToPage - * prefers to receive tuples in TID order. - */ - reltuples = IndexBuildHeapScan(heap, index, indexInfo, false, - ginBuildCallback, (void *) &buildstate); + buildstate.pcxt = NULL; - /* dump remaining entries to the index */ - oldCtx = MemoryContextSwitchTo(buildstate.tmpCtx); - ginBeginBAScan(&buildstate.accum); - while ((list = ginGetBAEntry(&buildstate.accum, - &attnum, &key, &category, &nlist)) != NULL) { - /* there could be many entries, so be willing to abort here */ - CHECK_FOR_INTERRUPTS(); - ginEntryInsert(&buildstate.ginstate, attnum, key, category, - list, nlist, &buildstate.buildStats); - } - MemoryContextSwitchTo(oldCtx); + GinBuildTask task; + double backend_reltuples = 0; - MemoryContextDelete(buildstate.funcCtx); - MemoryContextDelete(buildstate.tmpCtx); + if (wnum > 0) + { + launchWorkers(&buildstate, wnum, heap, index); + backend_reltuples += ginbuildCommon(&buildstate, heap, index, indexInfo); + finishWorkers(&buildstate, &task); + } + else + { + buildstate.task = &task; + initTask(buildstate.task, heap, index); + backend_reltuples += ginbuildCommon(&buildstate, heap, index, indexInfo); + } - /* - * Update metapage stats - */ - buildstate.buildStats.nTotalPages = RelationGetNumberOfBlocks(index); - ginUpdateStats(index, &buildstate.buildStats); + reltuples = backend_reltuples + task.reltuples; + } /* * Return statistics diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 65a6cd4..6afbb8a 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -2775,6 +2775,31 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"gin_parallel_workers", + PGC_USERSET, + RESOURCES_ASYNCHRONOUS, + gettext_noop("Maximum number of parallel workers for GIN buiding."), + NULL, + }, + &gin_parallel_workers, + 0, 0, MAX_BACKENDS, + NULL, NULL, NULL + }, + + { + {"gin_shared_mem", + PGC_USERSET, + RESOURCES_ASYNCHRONOUS, + gettext_noop("The size of shared memory segment for parallel GIN buiding."), + NULL, + GUC_UNIT_KB + }, + &gin_shared_mem, + 16 * 1024, 1024, MAX_KILOBYTES, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 5536012..15bf5ca 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -541,6 +541,8 @@ #xmloption = 'content' #gin_fuzzy_search_limit = 0 #gin_pending_list_limit = 4MB +#gin_parallel_workers = 0 # 0 disables parallel gin build +#gin_shared_mem = 16MB # - Locale and Formatting - diff --git a/src/include/access/gin.h b/src/include/access/gin.h index e5b2e10..91e5b27 100644 --- a/src/include/access/gin.h +++ b/src/include/access/gin.h @@ -68,6 +68,8 @@ typedef char GinTernaryValue; /* GUC parameters */ extern PGDLLIMPORT int GinFuzzySearchLimit; extern int gin_pending_list_limit; +extern int gin_parallel_workers; +extern int gin_shared_mem; /* ginutil.c */ extern void ginGetStats(Relation index, GinStatsData *stats); diff --git a/src/test/regress/expected/gin.out b/src/test/regress/expected/gin.out index a3911a6..fc6bac0 100644 --- a/src/test/regress/expected/gin.out +++ b/src/test/regress/expected/gin.out @@ -23,6 +23,88 @@ select gin_clean_pending_list('gin_test_idx'); -- nothing to flush 0 (1 row) +-- Test parallel building +drop index gin_test_idx; +select count(*) from gin_test_tbl where i @> array[1,2]; + count +------- + 20002 +(1 row) + +select count(*) from gin_test_tbl where i @> array[2]; + count +------- + 20002 +(1 row) + +select count(*) from gin_test_tbl where i @> array[8]; + count +------- + 3 +(1 row) + +set gin_parallel_workers = 1; +create index gin_test_idx on gin_test_tbl using gin (i); +select count(*) from gin_test_tbl where i @> array[1,2]; + count +------- + 20002 +(1 row) + +select count(*) from gin_test_tbl where i @> array[2]; + count +------- + 20002 +(1 row) + +select count(*) from gin_test_tbl where i @> array[8]; + count +------- + 3 +(1 row) + +drop index gin_test_idx; +set gin_parallel_workers = 2; +create index gin_test_idx on gin_test_tbl using gin (i); +select count(*) from gin_test_tbl where i @> array[1,2]; + count +------- + 20002 +(1 row) + +select count(*) from gin_test_tbl where i @> array[2]; + count +------- + 20002 +(1 row) + +select count(*) from gin_test_tbl where i @> array[8]; + count +------- + 3 +(1 row) + +drop index gin_test_idx; +set gin_parallel_workers = 8; +create index gin_test_idx on gin_test_tbl using gin (i); +select count(*) from gin_test_tbl where i @> array[1,2]; + count +------- + 20002 +(1 row) + +select count(*) from gin_test_tbl where i @> array[2]; + count +------- + 20002 +(1 row) + +select count(*) from gin_test_tbl where i @> array[8]; + count +------- + 3 +(1 row) + -- Test vacuuming delete from gin_test_tbl where i @> array[2]; vacuum gin_test_tbl; diff --git a/src/test/regress/sql/gin.sql b/src/test/regress/sql/gin.sql index c566e9b..8b0eb45 100644 --- a/src/test/regress/sql/gin.sql +++ b/src/test/regress/sql/gin.sql @@ -19,6 +19,33 @@ vacuum gin_test_tbl; -- flush the fastupdate buffers select gin_clean_pending_list('gin_test_idx'); -- nothing to flush +-- Test parallel building + +drop index gin_test_idx; +select count(*) from gin_test_tbl where i @> array[1,2]; +select count(*) from gin_test_tbl where i @> array[2]; +select count(*) from gin_test_tbl where i @> array[8]; + +set gin_parallel_workers = 1; +create index gin_test_idx on gin_test_tbl using gin (i); +select count(*) from gin_test_tbl where i @> array[1,2]; +select count(*) from gin_test_tbl where i @> array[2]; +select count(*) from gin_test_tbl where i @> array[8]; + +drop index gin_test_idx; +set gin_parallel_workers = 2; +create index gin_test_idx on gin_test_tbl using gin (i); +select count(*) from gin_test_tbl where i @> array[1,2]; +select count(*) from gin_test_tbl where i @> array[2]; +select count(*) from gin_test_tbl where i @> array[8]; + +drop index gin_test_idx; +set gin_parallel_workers = 8; +create index gin_test_idx on gin_test_tbl using gin (i); +select count(*) from gin_test_tbl where i @> array[1,2]; +select count(*) from gin_test_tbl where i @> array[2]; +select count(*) from gin_test_tbl where i @> array[8]; + -- Test vacuuming delete from gin_test_tbl where i @> array[2]; vacuum gin_test_tbl;
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers