Tomas Vondra <tomas.von...@enterprisedb.com> writes: Hi Tomas,
I am in a incompleted review process but I probably doesn't have much time on this because of my internal tasks. So I just shared what I did and the non-good-result patch. What I tried to do is: 1) remove all the "sort" effort for the state->bs_sort_state tuples since its input comes from state->bs_worker_state which is sorted already. 2). remove *partial* "sort" operations between accum.rbtree to state->bs_worker_state because the tuple in accum.rbtree is sorted already. Both of them can depend on the same API changes. 1. struct Tuplesortstate { .. + bool input_presorted; /* the tuples are presorted. */ + new_tapes; // writes the tuples in memory into a new 'run'. } and user can set it during tuplesort_begin_xx(, presorte=true); 2. in tuplesort_puttuple, if memory is full but presorted is true, we can (a) avoid the sort. (b) resuse the existing 'runs' to reduce the effort of 'mergeruns' unless new_tapes is set to true. once it switch to a new tapes, the set state->new_tapes to false and wait 3) to change it to true again. 3. tuplesort_dumptuples(..); // dump the tuples in memory and set new_tapes=true to tell the *this batch of input is presorted but they are done, the next batch is just presort in its own batch*. In the gin-parallel-build case, for the case 1), we can just use for tuple in bs_worker_sort: tuplesort_putgintuple(state->bs_sortstate, ..); tuplesort_dumptuples(..); At last we can get a). only 1 run in the worker so that the leader can have merge less runs in mergeruns. b). reduce the sort both in perform_sort_tuplesort and in sortstate_puttuple_common. for the case 2). we can have: for tuple in RBTree.tuples: tuplesort_puttuples(tuple) ; // this may cause a dumptuples internally when the memory is full, // but it is OK. tuplesort_dumptuples(..); we can just remove the "sort" into sortstate_puttuple_common but probably increase the 'runs' in sortstate which will increase the effort of mergeruns at last. But the test result is not good, maybe the 'sort' is not a key factor of this. I do missed the perf step before doing this. or maybe my test data is too small. Here is the patch I used for the above activity. and I used the following sql to test. CREATE TABLE t(a int[], b numeric[]); -- generate 1000 * 1000 rows. insert into t select i, n from normal_rand_array(1000, 90, 1::int4, 10000::int4) i, normal_rand_array(1000, 90, '1.00233234'::numeric, '8.239241989134'::numeric) n; alter table t set (parallel_workers=4); set debug_parallel_query to on; set max_parallel_maintenance_workers to 4; create index on t using gin(a); create index on t using gin(b); I found normal_rand_array is handy to use in this case and I register it into https://commitfest.postgresql.org/48/5061/. Besides the above stuff, I didn't find anything wrong in the currrent patch, and the above stuff can be categoried into "furture improvement" even it is worthy to. -- Best Regards Andy Fan
>From 48c2e03fd854c8f88f781adc944f37b004db0721 Mon Sep 17 00:00:00 2001 From: Andy Fan <zhihuifan1...@163.com> Date: Sat, 8 Jun 2024 13:21:08 +0800 Subject: [PATCH v20240702 1/3] Add function normal_rand_array function to contrib/tablefunc. It can produce an array of numbers with n controllable array length and duplicated elements in these arrays. --- contrib/tablefunc/Makefile | 2 +- contrib/tablefunc/expected/tablefunc.out | 26 ++++ contrib/tablefunc/sql/tablefunc.sql | 10 ++ contrib/tablefunc/tablefunc--1.0--1.1.sql | 7 ++ contrib/tablefunc/tablefunc.c | 140 ++++++++++++++++++++++ contrib/tablefunc/tablefunc.control | 2 +- doc/src/sgml/tablefunc.sgml | 10 ++ src/backend/utils/adt/arrayfuncs.c | 7 ++ 8 files changed, 202 insertions(+), 2 deletions(-) create mode 100644 contrib/tablefunc/tablefunc--1.0--1.1.sql diff --git a/contrib/tablefunc/Makefile b/contrib/tablefunc/Makefile index 191a3a1d38..f0c67308fd 100644 --- a/contrib/tablefunc/Makefile +++ b/contrib/tablefunc/Makefile @@ -3,7 +3,7 @@ MODULES = tablefunc EXTENSION = tablefunc -DATA = tablefunc--1.0.sql +DATA = tablefunc--1.0.sql tablefunc--1.0--1.1.sql PGFILEDESC = "tablefunc - various functions that return tables" REGRESS = tablefunc diff --git a/contrib/tablefunc/expected/tablefunc.out b/contrib/tablefunc/expected/tablefunc.out index ddece79029..9f0cbbfbbe 100644 --- a/contrib/tablefunc/expected/tablefunc.out +++ b/contrib/tablefunc/expected/tablefunc.out @@ -12,6 +12,32 @@ SELECT avg(normal_rand)::int, count(*) FROM normal_rand(100, 250, 0.2); -- negative number of tuples SELECT avg(normal_rand)::int, count(*) FROM normal_rand(-1, 250, 0.2); ERROR: number of rows cannot be negative +SELECT count(*), avg(COALESCE(array_length(i, 1), 0)) FROM normal_rand_array(10, 3, 1.23::numeric, 8::numeric) as i; + count | avg +-------+-------------------- + 10 | 3.0000000000000000 +(1 row) + +SELECT count(*), avg(COALESCE(array_length(i, 1), 0)) FROM normal_rand_array(10, 3, 1.23::int4, 8::int4) as i; + count | avg +-------+-------------------- + 10 | 3.0000000000000000 +(1 row) + +SELECT count(*), avg(COALESCE(array_length(i, 1), 0)) FROM normal_rand_array(10, 3, 1.23::int8, 8::int8) as i; + count | avg +-------+-------------------- + 10 | 3.0000000000000000 +(1 row) + +SELECT count(*), avg(COALESCE(array_length(i, 1), 0)) FROM normal_rand_array(10, 3, 1.23::float8, 8::float8) as i; + count | avg +-------+-------------------- + 10 | 3.0000000000000000 +(1 row) + +SELECT count(*), avg(COALESCE(array_length(i, 1), 0)) FROM normal_rand_array(10, 3, 'abc'::text, 'def'::text) as i; +ERROR: unsupported type 25 in normal_rand_array. -- -- crosstab() -- diff --git a/contrib/tablefunc/sql/tablefunc.sql b/contrib/tablefunc/sql/tablefunc.sql index 0fb8e40de2..dec57cfc66 100644 --- a/contrib/tablefunc/sql/tablefunc.sql +++ b/contrib/tablefunc/sql/tablefunc.sql @@ -8,6 +8,16 @@ SELECT avg(normal_rand)::int, count(*) FROM normal_rand(100, 250, 0.2); -- negative number of tuples SELECT avg(normal_rand)::int, count(*) FROM normal_rand(-1, 250, 0.2); +SELECT count(*), avg(COALESCE(array_length(i, 1), 0)) FROM normal_rand_array(10, 3, 1.23::numeric, 8::numeric) as i; + +SELECT count(*), avg(COALESCE(array_length(i, 1), 0)) FROM normal_rand_array(10, 3, 1.23::int4, 8::int4) as i; + +SELECT count(*), avg(COALESCE(array_length(i, 1), 0)) FROM normal_rand_array(10, 3, 1.23::int8, 8::int8) as i; + +SELECT count(*), avg(COALESCE(array_length(i, 1), 0)) FROM normal_rand_array(10, 3, 1.23::float8, 8::float8) as i; + +SELECT count(*), avg(COALESCE(array_length(i, 1), 0)) FROM normal_rand_array(10, 3, 'abc'::text, 'def'::text) as i; + -- -- crosstab() -- diff --git a/contrib/tablefunc/tablefunc--1.0--1.1.sql b/contrib/tablefunc/tablefunc--1.0--1.1.sql new file mode 100644 index 0000000000..9d13e80ff0 --- /dev/null +++ b/contrib/tablefunc/tablefunc--1.0--1.1.sql @@ -0,0 +1,7 @@ +-- complain if script is sourced in psql, rather than via ALTER EXTENSION +\echo Use "ALTER EXTENSION tablefunc UPDATE TO '1.1'" to load this file. \quit + +CREATE FUNCTION normal_rand_array(int4, int4, anyelement, anyelement) +RETURNS setof anyarray +AS 'MODULE_PATHNAME','normal_rand_array' +LANGUAGE C VOLATILE STRICT; diff --git a/contrib/tablefunc/tablefunc.c b/contrib/tablefunc/tablefunc.c index 7d1b5f5143..6d26aa843b 100644 --- a/contrib/tablefunc/tablefunc.c +++ b/contrib/tablefunc/tablefunc.c @@ -42,7 +42,9 @@ #include "lib/stringinfo.h" #include "miscadmin.h" #include "tablefunc.h" +#include "utils/array.h" #include "utils/builtins.h" +#include "utils/fmgroids.h" PG_MODULE_MAGIC; @@ -91,6 +93,13 @@ typedef struct bool use_carry; /* use second generated value */ } normal_rand_fctx; +typedef struct +{ + int carry_len; + FunctionCallInfo fcinfo; + FunctionCallInfo random_len_fcinfo; +} normal_rand_array_fctx; + #define xpfree(var_) \ do { \ if (var_ != NULL) \ @@ -269,6 +278,137 @@ normal_rand(PG_FUNCTION_ARGS) SRF_RETURN_DONE(funcctx); } +/* + * normal_rand_array - return requested number of random arrays + * with a Gaussian (Normal) distribution. + * + * inputs are int numvals, int mean_len, anyelement minvalue, + * anyelement maxvalue returns setof anyelement[] + */ +PG_FUNCTION_INFO_V1(normal_rand_array); +Datum +normal_rand_array(PG_FUNCTION_ARGS) +{ + FuncCallContext *funcctx; + uint64 call_cntr; + uint64 max_calls; + normal_rand_array_fctx *fctx; + MemoryContext oldcontext; + Datum minvalue, maxvalue; + int array_mean_len; + Oid target_oid, random_fn_oid; + + array_mean_len = PG_GETARG_INT32(1); + minvalue = PG_GETARG_DATUM(2); + maxvalue = PG_GETARG_DATUM(3); + + target_oid = get_fn_expr_argtype(fcinfo->flinfo, 2); + + if (target_oid == INT4OID) + random_fn_oid = F_RANDOM_INT4_INT4; + else if (target_oid == INT8OID) + random_fn_oid = F_RANDOM_INT8_INT8; + else if (target_oid == FLOAT8OID) + random_fn_oid = F_RANDOM_; + else if (target_oid == NUMERICOID) + random_fn_oid = F_RANDOM_NUMERIC_NUMERIC; + else + elog(ERROR, "unsupported type %d in normal_rand_array.", + target_oid); + + /* stuff done only on the first call of the function */ + if (SRF_IS_FIRSTCALL()) + { + int32 num_tuples; + FmgrInfo *random_len_flinfo, *random_val_flinfo; + FunctionCallInfo random_len_fcinfo, random_val_fcinfo; + + /* create a function context for cross-call persistence */ + funcctx = SRF_FIRSTCALL_INIT(); + + /* + * switch to memory context appropriate for multiple function calls + */ + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + /* total number of tuples to be returned */ + num_tuples = PG_GETARG_INT32(0); + if (num_tuples < 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("number of rows cannot be negative"))); + funcctx->max_calls = num_tuples; + + /* allocate memory for user context */ + fctx = (normal_rand_array_fctx *) palloc(sizeof(normal_rand_array_fctx)); + + random_len_fcinfo = (FunctionCallInfo) palloc0(SizeForFunctionCallInfo(2)); + random_len_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo)); + fmgr_info(F_RANDOM_INT4_INT4, random_len_flinfo); + InitFunctionCallInfoData(*random_len_fcinfo, random_len_flinfo, 2, InvalidOid, NULL, NULL); + + random_len_fcinfo->args[0].isnull = false; + random_len_fcinfo->args[1].isnull = false; + random_len_fcinfo->args[0].value = 0; + random_len_fcinfo->args[1].value = array_mean_len; + + random_val_fcinfo = (FunctionCallInfo) palloc0(SizeForFunctionCallInfo(2)); + random_val_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo)); + fmgr_info(random_fn_oid, random_val_flinfo); + InitFunctionCallInfoData(*random_val_fcinfo, random_val_flinfo, 2, InvalidOid, NULL, NULL); + + random_val_fcinfo->args[0].isnull = false; + random_val_fcinfo->args[1].isnull = false; + random_val_fcinfo->args[0].value = minvalue; + random_val_fcinfo->args[1].value = maxvalue; + + fctx->carry_len = -1; + fctx->fcinfo = random_val_fcinfo; + fctx->random_len_fcinfo = random_len_fcinfo; + + funcctx->user_fctx = fctx; + + MemoryContextSwitchTo(oldcontext); + } + + /* stuff done on every call of the function */ + funcctx = SRF_PERCALL_SETUP(); + + call_cntr = funcctx->call_cntr; + max_calls = funcctx->max_calls; + fctx = funcctx->user_fctx; + + if (call_cntr < max_calls) /* do when there is more left to send */ + { + int array_len; + int i; + Datum *results; + + if (fctx->carry_len != -1) + { + array_len = fctx->carry_len; + fctx->carry_len = -1; + } + else + { + array_len = Int32GetDatum(FunctionCallInvoke(fctx->random_len_fcinfo)); + fctx->carry_len = 2 * array_mean_len - array_len; + } + + results = palloc(array_len * sizeof(Datum)); + + for(i = 0; i < array_len; i++) + results[i] = FunctionCallInvoke(fctx->fcinfo); + + + SRF_RETURN_NEXT(funcctx, PointerGetDatum( + construct_array_builtin(results, array_len, target_oid))); + } + else + /* do when there is no more left */ + SRF_RETURN_DONE(funcctx); +} + /* * get_normal_pair() * Assigns normally distributed (Gaussian) values to a pair of provided diff --git a/contrib/tablefunc/tablefunc.control b/contrib/tablefunc/tablefunc.control index 7b25d16170..9cc6222a4f 100644 --- a/contrib/tablefunc/tablefunc.control +++ b/contrib/tablefunc/tablefunc.control @@ -1,6 +1,6 @@ # tablefunc extension comment = 'functions that manipulate whole tables, including crosstab' -default_version = '1.0' +default_version = '1.1' module_pathname = '$libdir/tablefunc' relocatable = true trusted = true diff --git a/doc/src/sgml/tablefunc.sgml b/doc/src/sgml/tablefunc.sgml index e10fe7009d..014c36b81c 100644 --- a/doc/src/sgml/tablefunc.sgml +++ b/doc/src/sgml/tablefunc.sgml @@ -53,6 +53,16 @@ </para></entry> </row> + <row> + <entry role="func_table_entry"><para role="func_signature"> + <function>normal_rand_array</function> ( <parameter>numvals</parameter> <type>integer</type>, <parameter>meanarraylen</parameter> <type>int4</type>, <parameter>minval</parameter> <type>anyelement</type>, <parameter>maxval</parameter> <type>anyelement</type> ) + <returnvalue>setof anyarray</returnvalue> + </para> + <para> + Produces a set of normally distributed random array of numbers. + </para></entry> + </row> + <row> <entry role="func_table_entry"><para role="func_signature"> <function>crosstab</function> ( <parameter>sql</parameter> <type>text</type> ) diff --git a/src/backend/utils/adt/arrayfuncs.c b/src/backend/utils/adt/arrayfuncs.c index d6641b570d..7c95cc05bc 100644 --- a/src/backend/utils/adt/arrayfuncs.c +++ b/src/backend/utils/adt/arrayfuncs.c @@ -3397,6 +3397,12 @@ construct_array_builtin(Datum *elems, int nelems, Oid elmtype) elmalign = TYPALIGN_INT; break; + case FLOAT8OID: + elmlen = sizeof(float8); + elmbyval = FLOAT8PASSBYVAL; + elmalign = TYPALIGN_DOUBLE; + break; + case INT2OID: elmlen = sizeof(int16); elmbyval = true; @@ -3429,6 +3435,7 @@ construct_array_builtin(Datum *elems, int nelems, Oid elmtype) break; case TEXTOID: + case NUMERICOID: elmlen = -1; elmbyval = false; elmalign = TYPALIGN_INT; -- 2.45.1
>From 3acff4722a642c43bad5cd9ac89b81989d32998e Mon Sep 17 00:00:00 2001 From: Andy Fan <zhihuifan1...@163.com> Date: Sun, 23 Jun 2024 14:31:41 +0000 Subject: [PATCH v20240702 2/3] fix incorrect comments. --- src/backend/catalog/index.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c index 55fdde4b24..73bfe5da00 100644 --- a/src/backend/catalog/index.c +++ b/src/backend/catalog/index.c @@ -2958,8 +2958,7 @@ index_build(Relation heapRelation, Assert(PointerIsValid(indexRelation->rd_indam->ambuildempty)); /* - * Determine worker process details for parallel CREATE INDEX. Currently, - * only btree has support for parallel builds. + * Determine worker process details for parallel CREATE INDEX. * * Note that planner considers parallel safety for us. */ -- 2.45.1
>From 27949647f968fc7914a48ce9c4dae9462c2b7707 Mon Sep 17 00:00:00 2001 From: Andy Fan <zhihuifan1...@163.com> Date: Tue, 2 Jul 2024 07:40:00 +0800 Subject: [PATCH v20240702 3/3] optimize some sorts on tuplesort.c if the input is sorted. add input_presorted member in Tuplesortstate to indicate the tuples is sorted already, it can be 'partially' sorted or 'overall' sorted. Within input_presorted is set, we can remove the sorts during the tuplesort_puttuple_common when the memory is full and continue to reuse the previous 'runs' in the current tape on behalf of mergeruns do less work, unless caller tells tuplesort.c to puttuple into the next run, this is the user case where the inputs are just presorted in some different batches, the side impacts is the number of 'runs' is not decided by work_mem but decided by users calls. I also use this optimization to Gin index parallel build, at the stage of 'bs_worker_sort' -> 'bs_sort_state' where all the inputs are sorted. so the 'runs' can be reduced to 1 for sure. However the improvements are not measurable (the sort is too cheap in this case, or the reduce of 'runs' isn't helpful due to my test case?) --- src/backend/access/gin/gininsert.c | 41 +++++++-- src/backend/utils/sort/tuplesort.c | 102 ++++++++++++++++++--- src/backend/utils/sort/tuplesortvariants.c | 6 +- src/include/utils/tuplesort.h | 9 +- 4 files changed, 132 insertions(+), 26 deletions(-) diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c index 29bca0c54c..df469e3d9e 100644 --- a/src/backend/access/gin/gininsert.c +++ b/src/backend/access/gin/gininsert.c @@ -718,7 +718,7 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo) */ state->bs_sortstate = tuplesort_begin_index_gin(maintenance_work_mem, coordinate, - TUPLESORT_NONE); + TUPLESORT_NONE, false); /* scan the relation and merge per-worker results */ reltuples = _gin_parallel_merge(state); @@ -1743,7 +1743,9 @@ _gin_process_worker_data(GinBuildState *state, Tuplesortstate *worker_sort) buffer = GinBufferInit(); /* sort the raw per-worker data */ + elog(LOG, "tuplesort_performsort(state->bs_worker_sort); started"); tuplesort_performsort(state->bs_worker_sort); + elog(LOG, "tuplesort_performsort(state->bs_worker_sort); done"); /* print some basic info */ elog(LOG, "_gin_parallel_scan_and_build raw %zu compressed %zu ratio %.2f%%", @@ -1754,6 +1756,8 @@ _gin_process_worker_data(GinBuildState *state, Tuplesortstate *worker_sort) state->buildStats.sizeCompressed = 0; state->buildStats.sizeRaw = 0; + elog(LOG, "start to fill to bs_statesort"); + /* * Read the GIN tuples from the shared tuplesort, sorted by the key, and * merge them into larger chunks for the leader to combine. @@ -1854,6 +1858,8 @@ _gin_process_worker_data(GinBuildState *state, Tuplesortstate *worker_sort) GinBufferReset(buffer); } + elog(LOG, "finish to fill to bs_sortstate"); + /* relase all the memory */ GinBufferFree(buffer); @@ -1894,13 +1900,21 @@ _gin_parallel_scan_and_build(GinBuildState *state, coordinate->nParticipants = -1; coordinate->sharedsort = sharedsort; - /* Begin "partial" tuplesort */ + /* + * Begin "partial" tuplesort, the input tuples come from RBTree, so they + * are pre-sorted in batch, however since the batch is too small, we will + * think it is not pre-sorted and let tuplesort_state sort the multi + * batches and ..; + */ state->bs_sortstate = tuplesort_begin_index_gin(sortmem, coordinate, - TUPLESORT_NONE); + TUPLESORT_NONE, true); - /* Local per-worker sort of raw-data */ + /* + * Local per-worker sort of raw-data, the input tuples come from + * bs_sortstate, so all the tuples are presorted. + */ state->bs_worker_sort = tuplesort_begin_index_gin(sortmem, NULL, - TUPLESORT_NONE); + TUPLESORT_NONE, false); /* Join parallel scan */ indexInfo = BuildIndexInfo(index); @@ -1909,6 +1923,7 @@ _gin_parallel_scan_and_build(GinBuildState *state, scan = table_beginscan_parallel(heap, ParallelTableScanFromGinShared(ginshared)); + elog(LOG, "start to fill into bs_worker_start"); reltuples = table_index_build_scan(heap, index, indexInfo, true, progress, ginBuildCallbackParallel, state, scan); @@ -1948,6 +1963,8 @@ _gin_parallel_scan_and_build(GinBuildState *state, ginInitBA(&state->accum); } + elog(LOG, "end to fill into bs_worker_start"); + /* * Do the first phase of in-worker processing - sort the data produced by * the callback, and combine them into much larger chunks and place that @@ -1955,8 +1972,18 @@ _gin_parallel_scan_and_build(GinBuildState *state, */ _gin_process_worker_data(state, state->bs_worker_sort); - /* sort the GIN tuples built by this worker */ - tuplesort_performsort(state->bs_sortstate); + elog(LOG, "start to sort bs_sortstate"); + + /* + * the tuple is sorted already in bs_worker_sort, so let's dump the left + * tuples into tapes, no sort is needed. + */ + tuplesort_dump_sortedtuples(state->bs_sortstate); + + /* mark the worker has finished its work. */ + worker_freeze_result_tape(state->bs_sortstate); + + elog(LOG, "end to sort bs_sortstate"); state->bs_reltuples += reltuples; diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c index 7c4d6dc106..36e0a77b8d 100644 --- a/src/backend/utils/sort/tuplesort.c +++ b/src/backend/utils/sort/tuplesort.c @@ -187,6 +187,9 @@ struct Tuplesortstate { TuplesortPublic base; TupSortStatus status; /* enumerated value as shown above */ + bool input_presorted; /* if the input presorted . */ + bool new_tapes; /* force to selectnewtapes, used with + * input_presorted, see dumptumples */ bool bounded; /* did caller specify a maximum number of * tuples to return? */ bool boundUsed; /* true if we made use of a bounded heap */ @@ -475,7 +478,6 @@ static void reversedirection(Tuplesortstate *state); static unsigned int getlen(LogicalTape *tape, bool eofOK); static void markrunend(LogicalTape *tape); static int worker_get_identifier(Tuplesortstate *state); -static void worker_freeze_result_tape(Tuplesortstate *state); static void worker_nomergeruns(Tuplesortstate *state); static void leader_takeover_tapes(Tuplesortstate *state); static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup); @@ -643,6 +645,12 @@ qsort_tuple_int32_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state) Tuplesortstate * tuplesort_begin_common(int workMem, SortCoordinate coordinate, int sortopt) +{ + return tuplesort_begin_common_ext(workMem, coordinate, sortopt, false); +} + +Tuplesortstate * +tuplesort_begin_common_ext(int workMem, SortCoordinate coordinate, int sortopt, bool presorted) { Tuplesortstate *state; MemoryContext maincontext; @@ -742,6 +750,8 @@ tuplesort_begin_common(int workMem, SortCoordinate coordinate, int sortopt) } MemoryContextSwitchTo(oldcontext); + state->input_presorted = presorted; + state->new_tapes = true; return state; } @@ -1846,6 +1856,36 @@ tuplesort_merge_order(int64 allowedMem) return mOrder; } +/* + * Dump the presorted in-memory tuples into tapes and let next batch + * of sorted tuples dump to a new tape. + */ +void +tuplesort_dump_sortedtuples(Tuplesortstate *state) +{ + MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext); + + if (state->tapeset == NULL) + { + inittapes(state, true); + } + + dumptuples(state, true); + + /* add a end-mark for this run. */ + markrunend(state->destTape); + + /* When dumptuples for the next batch, we need a new_tapes. */ + state->new_tapes = true; + + /* + * record the result_tape for the sake of worker_freeze_result_tape. where + * 'LogicalTapeFreeze(state->result_tape, &output);' is called. + */ + state->result_tape = state->outputTapes[0]; + MemoryContextSwitchTo(oldcontext); +} + /* * Helper function to calculate how much memory to allocate for the read buffer * of each input tape in a merge pass. @@ -2105,6 +2145,7 @@ mergeruns(Tuplesortstate *state) * don't bother. (The initial input tapes are still in outputTapes. The * number of input tapes will not increase between passes.) */ + elog(INFO, "number of runs %d ", state->currentRun); state->memtupsize = state->nOutputTapes; state->memtuples = (SortTuple *) MemoryContextAlloc(state->base.maincontext, state->nOutputTapes * sizeof(SortTuple)); @@ -2334,6 +2375,10 @@ mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup) * * When alltuples = true, dump everything currently in memory. (This case is * only used at end of input data.) + * + * When input_presorted = true and new_tapes=false, dump everything to the + * existing tape (rather than select a new tap) to order to reduce the number + * of tapes & runs; */ static void dumptuples(Tuplesortstate *state, bool alltuples) @@ -2372,23 +2417,41 @@ dumptuples(Tuplesortstate *state, bool alltuples) errmsg("cannot have more than %d runs for an external sort", INT_MAX))); - if (state->currentRun > 0) - selectnewtape(state); + if (!state->input_presorted) + { + if (state->currentRun > 0) + selectnewtape(state); - state->currentRun++; + state->currentRun++; #ifdef TRACE_SORT - if (trace_sort) - elog(LOG, "worker %d starting quicksort of run %d: %s", - state->worker, state->currentRun, - pg_rusage_show(&state->ru_start)); + if (trace_sort) + elog(LOG, "worker %d starting quicksort of run %d: %s", + state->worker, state->currentRun, + pg_rusage_show(&state->ru_start)); #endif - /* - * Sort all tuples accumulated within the allowed amount of memory for - * this run using quicksort - */ - tuplesort_sort_memtuples(state); + /* + * Sort all tuples accumulated within the allowed amount of memory for + * this run using quicksort. + */ + tuplesort_sort_memtuples(state); + } + else if (state->new_tapes) + { + if (state->currentRun > 0) + selectnewtape(state); + + state->currentRun++; + /* let reuse the existing tape next time. */ + state->new_tapes = false; + } + else + { + /* + * We always using the preexisting tape to reduce the number of tapes. + */ + } #ifdef TRACE_SORT if (trace_sort) @@ -2423,7 +2486,14 @@ dumptuples(Tuplesortstate *state, bool alltuples) FREEMEM(state, state->tupleMem); state->tupleMem = 0; - markrunend(state->destTape); + if (!state->input_presorted) + { + markrunend(state->destTape); + } + else + { + /* handle it in tuplesort_dump_sortedtuples */ + } #ifdef TRACE_SORT if (trace_sort) @@ -3043,12 +3113,14 @@ worker_get_identifier(Tuplesortstate *state) * There should only be one final output run for each worker, which consists * of all tuples that were originally input into worker. */ -static void +void worker_freeze_result_tape(Tuplesortstate *state) { Sharedsort *shared = state->shared; TapeShare output; + elog(LOG, "No. of runs %d ", state->currentRun); + elog(INFO, "No. of runs %d ", state->currentRun); Assert(WORKER(state)); Assert(state->result_tape != NULL); Assert(state->memtupcount == 0); diff --git a/src/backend/utils/sort/tuplesortvariants.c b/src/backend/utils/sort/tuplesortvariants.c index 3d5b5ce015..94e098974b 100644 --- a/src/backend/utils/sort/tuplesortvariants.c +++ b/src/backend/utils/sort/tuplesortvariants.c @@ -592,10 +592,10 @@ tuplesort_begin_index_brin(int workMem, Tuplesortstate * tuplesort_begin_index_gin(int workMem, SortCoordinate coordinate, - int sortopt) + int sortopt, bool presorted) { - Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate, - sortopt); + Tuplesortstate *state = tuplesort_begin_common_ext(workMem, coordinate, + sortopt, presorted); TuplesortPublic *base = TuplesortstateGetPublic(state); #ifdef TRACE_SORT diff --git a/src/include/utils/tuplesort.h b/src/include/utils/tuplesort.h index 659d551247..26fbb6b757 100644 --- a/src/include/utils/tuplesort.h +++ b/src/include/utils/tuplesort.h @@ -375,6 +375,12 @@ typedef struct extern Tuplesortstate *tuplesort_begin_common(int workMem, SortCoordinate coordinate, int sortopt); +extern Tuplesortstate *tuplesort_begin_common_ext(int workMem, + SortCoordinate coordinate, + int sortopt, + bool input_presorted); +extern void worker_freeze_result_tape(Tuplesortstate *state); + extern void tuplesort_set_bound(Tuplesortstate *state, int64 bound); extern bool tuplesort_used_bound(Tuplesortstate *state); extern void tuplesort_puttuple_common(Tuplesortstate *state, @@ -387,6 +393,7 @@ extern bool tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward); extern void tuplesort_end(Tuplesortstate *state); extern void tuplesort_reset(Tuplesortstate *state); +extern void tuplesort_dump_sortedtuples(Tuplesortstate *state); extern void tuplesort_get_stats(Tuplesortstate *state, TuplesortInstrumentation *stats); @@ -445,7 +452,7 @@ extern Tuplesortstate *tuplesort_begin_index_gist(Relation heapRel, extern Tuplesortstate *tuplesort_begin_index_brin(int workMem, SortCoordinate coordinate, int sortopt); extern Tuplesortstate *tuplesort_begin_index_gin(int workMem, SortCoordinate coordinate, - int sortopt); + int sortopt, bool presorted); extern Tuplesortstate *tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation, bool nullsFirstFlag, -- 2.45.1