Re: Re: fix cost subqueryscan wrong parallel cost

2022-04-21 Thread bu...@sohu.com
> > for now fuction cost_subqueryscan always using *total* rows even parallel
> > path. like this:
> >
> > Gather (rows=3)
> >   Workers Planned: 2
> >   ->  Subquery Scan  (rows=3) -- *total* rows, should be equal subpath
> > ->  Parallel Seq Scan  (rows=1)
>  
> OK, that's bad.
>  
> > Maybe the codes:
> >
> > /* Mark the path with the correct row estimate */
> > if (param_info)
> > path->path.rows = param_info->ppi_rows;
> > else
> > path->path.rows = baserel->rows;
> >
> > should change to:
> >
> > /* Mark the path with the correct row estimate */
> > if (path->path.parallel_workers > 0)
> > path->path.rows = path->subpath->rows;
> > else if (param_info)
> > path->path.rows = param_info->ppi_rows;
> > else
> > path->path.rows = baserel->rows;
>  
> Suppose parallelism is not in use and that param_info is NULL. Then,
> is path->subpath->rows guaranteed to be equal to baserel->rows? If
> yes, then we don't need to a three-part if statement as you propose
> here and can just change the "else" clause to say path->path.rows =
> path->subpath->rows. If no, then your change gives the wrong answer.
I checked some regress test, Sometimes subquery scan have filter, 
so path->subpath->row guaranteed *not* to be equal to baserel->rows.
If the first patch is false, I don't known how to fix this,
looks like need someone's help.



Re: Re: fix cost subqueryscan wrong parallel cost

2022-04-20 Thread bu...@sohu.com
> Sure, but that doesn't make the patch correct. The patch proposes
> that, when parallelism in use, a subquery scan will produce fewer rows
> than when parallelism is not in use, and that's 100% false. Compare
> this with the case of a parallel sequential scan. If a table contains
> 1000 rows, and we scan it with a regular Seq Scan, the Seq Scan will
> return 1000 rows.  But if we scan it with a Parallel Seq Scan using
> say 4 workers, the number of rows returned in each worker will be
> substantially less than 1000, because 1000 is now the *total* number
> of rows to be returned across *all* processes, and what we need is the
> number of rows returned in *each* process.

for now fuction cost_subqueryscan always using *total* rows even parallel
path. like this:

Gather (rows=3)
  Workers Planned: 2
  ->  Subquery Scan  (rows=3) -- *total* rows, should be equal subpath
->  Parallel Seq Scan  (rows=1)

Maybe the codes:

/* Mark the path with the correct row estimate */
if (param_info)
path->path.rows = param_info->ppi_rows;
else
path->path.rows = baserel->rows;

should change to:

/* Mark the path with the correct row estimate */
if (path->path.parallel_workers > 0)
path->path.rows = path->subpath->rows;
else if (param_info)
path->path.rows = param_info->ppi_rows;
else
path->path.rows = baserel->rows;



bu...@sohu.com


Re: Re: fix cost subqueryscan wrong parallel cost

2022-04-15 Thread bu...@sohu.com
> Generally it should be. But there's no subquery scan visible here.
I wrote a patch for distinct/union and aggregate support last year(I want 
restart it again).
https://www.postgresql.org/message-id/2021091517250848215321%40sohu.com 
If not apply this patch, some parallel paths will naver be selected.

> Some debugging work shows that the second path is generated but then
> fails when competing with the first path. So if there is something
> wrong, I think cost calculation is the suspicious point.
Maybe, I will check it again.

> Not related to this topic but I noticed another problem from the plan.
> Note the first Sort node which is to unique-ify the result of the UNION.
> Why cannot we re-arrange the sort keys from (a, b, c) to (a, c, b) so
> that we can avoid the second Sort node?
This is a regress test, just for test Incremental Sort plan.



fix cost subqueryscan wrong parallel cost

2022-04-12 Thread bu...@sohu.com
The cost_subqueryscan function does not judge whether it is parallel.
regress
-- Incremental sort vs. set operations with varno 0
set enable_hashagg to off;
explain (costs off) select * from t union select * from t order by 1,3;
QUERY PLAN
--
 Incremental Sort
   Sort Key: t.a, t.c
   Presorted Key: t.a
   ->  Unique
 ->  Sort
   Sort Key: t.a, t.b, t.c
   ->  Append
 ->  Gather
   Workers Planned: 2
   ->  Parallel Seq Scan on t
 ->  Gather
   Workers Planned: 2
   ->  Parallel Seq Scan on t t_1
to
 Incremental Sort
   Sort Key: t.a, t.c
   Presorted Key: t.a
   ->  Unique
 ->  Sort
   Sort Key: t.a, t.b, t.c
   ->  Gather
 Workers Planned: 2
 ->  Parallel Append
   ->  Parallel Seq Scan on t
   ->  Parallel Seq Scan on t t_1
Obviously the latter is less expensive



bu...@sohu.com


fix-cost_subqueryscan-get-wrong-parallel-cost.patch
Description: Binary data


Re: Re: parallel distinct union and aggregate support patch

2021-07-21 Thread bu...@sohu.com
Sorry, this email was marked spam by sohu, so I didn't notice it, and last few 
months I work hard for merge PostgreSQL 14 to our cluster 
version(github.com/ADBSQL/AntDB).

I have an idea how to make "Parallel Redistribute" work, even under "Parallel 
Append" and "Nestloop". but "grouping sets" can not work in parallel mode using 
"Parallel Redistribute".
Wait days please, path coming soon.


 
From: David Rowley
Date: 2021-07-06 10:48
To: bu...@sohu.com
CC: David Steele; pgsql-hackers; tgl; Dilip Kumar; Thomas Munro; Tomas Vondra; 
hlinnaka; robertmhaas; pgsql
Subject: Re: Re: parallel distinct union and aggregate support patch
On Tue, 30 Mar 2021 at 22:33, bu...@sohu.com  wrote:
> I have written a plan with similar functions, It is known that the following 
> two situations do not work well.
 
I read through this thread and also wondered about a Parallel
Partition type operator.  It also seems to me that if it could be done
this way then you could just plug in existing nodes to get Sorting and
Aggregation rather than having to modify existing nodes to get them to
do what you need.
 
From what I've seen looking over the thread, a few people suggested
this and I didn't see anywhere where you responded to them about the
idea.  Just so you're aware, contributing to PostgreSQL is not a case
of throwing code at a wall and seeing which parts stick.  You need to
interact and respond to people reviewing your work. This is especially
true for the people who actually have the authority to merge any of
your work with the main code repo.
 
It seems to me you might be getting off to a bad start and you might
not be aware of this process. So I hope this email will help put you
on track.
 
Some of the people that you've not properly responded to include:
 
Thomas Munro:  PostgreSQL committer. Wrote Parallel Hash Join.
Robert Hass: PostgreSQL committer. Wrote much of the original parallel
query code
Heikki Linnakangas: PostgreSQL committer. Worked on many parts of the
planner and executor. Also works for the company that develops
Greenplum, a massively parallel processing RDBMS, based on Postgres.
 
You might find other information in [1].
 
If I wanted to do what you want to do, I think those 3 people might be
some of the last people I'd pick to ignore questions from! :-)
 
Also, I'd say also copying in Tom Lane randomly when he's not shown
any interest in the patch here is likely not a good way of making
forward progress.  You might find that it might bubble up on his radar
if you start constructively interacting with the people who have
questioned your design.  I'd say that should be your next step.
 
The probability of anyone merging any of your code without properly
discussing the design with the appropriate people are either very
close to zero or actually zero.
 
I hope this email helps you get on track.
 
David
 
[1] https://www.postgresql.org/community/contributors/


Re: Re: parallel distinct union and aggregate support patch

2021-03-30 Thread bu...@sohu.com
> This patch has not gotten any review in the last two CFs and is unlikely
> to be committed for PG14 so I have moved it to the 2021-07 CF. A rebase
> is also required so marked Waiting for Author.
>  
> I can see this is a work in progress, but you may want to consider the
> several suggestions that an unbuffered approach might be better.

I have written a plan with similar functions, It is known that the following 
two situations do not work well.
1. Under "Parallel Append" plan
  Gather
  -> Parallel Append
  -> Agg
  -> Parallel Redistribute(1)
  -> ...
  -> Agg
  -> Parallel Redistribute(2)
  -> ...
  when parallel worker 1 execute "Parallel Redistribute(1)" and worker execute 
"Parallel Redistribute(2)",
  both "Parallel Redistribute" plan can not send tuples to other worker(both 
worker are stuck),
  because outher worker's memory buffer run out soon.

2. Under "Nestloop" plan
  Gather
  -> Nestloop(1)
  -> Nestloop(2)
  -> Parallel Redistribute
  -> ...
  -> IndexScan
  -> Agg
  At some point might be the case: parallel worker 1 executing Agg and 
"Parallel Redistribute" plan's memory buffer is full,
  worker 2 executing "Parallel Redistribute" and it waiting worker 1 eat 
"Parallel Redistribute" plan's memory buffer,
  it's stuck.




bu...@sohu.com



Re: POC: converting Lists into arrays

2021-03-08 Thread bu...@sohu.com
Hello, here some macros for list_make, now we can using
list_make(...), not list_make1/2/3 ...

#define MACRO_ARGS(...) __VA_ARGS__
#define LIST_MAKE_1_(narg_, postfix_, ...) list_make ## narg_ ## 
postfix_(__VA_ARGS__)
#define LIST_MAKE_2_(...) LIST_MAKE_1_(__VA_ARGS__)
#define LIST_MAKE_3_(...) LIST_MAKE_2_(__VA_ARGS__)

#define list_make(...) LIST_MAKE_3_(MACRO_ARGS VA_ARGS_NARGS(__VA_ARGS__), 
/*empty*/, __VA_ARGS__)
#define list_make_int(...) LIST_MAKE_3_(MACRO_ARGS VA_ARGS_NARGS(__VA_ARGS__), 
_int, __VA_ARGS__)
#define list_make_oid(...) LIST_MAKE_3_(MACRO_ARGS VA_ARGS_NARGS(__VA_ARGS__), 
_oid, __VA_ARGS__)

macro VA_ARGS_NARGS defined in c.h

How to work:
for list_make_int(4,5,6)
step 1: LIST_MAKE_3_(MACRO_ARGS VA_ARGS_NARGS(4,5,6), _int, 4,5,6)
setp 2: LIST_MAKE_2_(MACRO_ARGS (3), _int, 4,5,6)
step 3: LIST_MAKE_1_(3, _int, 4,5,6)
step 4: list_make3_int(4,5,6)
step 5: list_make3_impl(T_IntList, ((ListCell) {.int_value = (4)}), ((ListCell) 
{.int_value = (5)}), ((ListCell) {.int_value = (6)}))

Or we can define some public macros, like this:
#define MACRO_ARGS(...) __VA_ARGS__
#define MACRO_COMBIN_1(prefix_, center_, postfix_, ...) prefix_ ## center_ ## 
postfix_(__VA_ARGS__)
#define MACRO_COMBIN_2(...) MACRO_COMBIN_1(__VA_ARGS__)
#define MACRO_COMBIN_3(...) MACRO_COMBIN_2(__VA_ARGS__)

#define list_make(...) MACRO_COMBIN_3(list_make, MACRO_ARGS 
VA_ARGS_NARGS(__VA_ARGS__), /*empty*/, __VA_ARGS__)
#define list_make_int(...) MACRO_COMBIN_3(list_make, MACRO_ARGS 
VA_ARGS_NARGS(__VA_ARGS__), _int, __VA_ARGS__)
#define list_make_oid(...) MACRO_COMBIN_3(list_make, MACRO_ARGS 
VA_ARGS_NARGS(__VA_ARGS__), _oid, __VA_ARGS__)



bu...@sohu.com
 
From: Tom Lane
Date: 2019-02-24 10:24
To: pgsql-hackers
Subject: POC: converting Lists into arrays
For quite some years now there's been dissatisfaction with our List
data structure implementation.  Because it separately palloc's each
list cell, it chews up lots of memory, and it's none too cache-friendly
because the cells aren't necessarily adjacent.  Moreover, our typical
usage is to just build a list by repeated lappend's and never modify it,
so that the flexibility of having separately insertable/removable list
cells is usually wasted.
 
Every time this has come up, I've opined that the right fix is to jack
up the List API and drive a new implementation underneath, as we did
once before (cf commit d0b4399d81).  I thought maybe it was about time
to provide some evidence for that position, so attached is a POC patch
that changes Lists into expansible arrays, while preserving most of
their existing API.
 
The big-picture performance change is that this makes list_nth()
a cheap O(1) operation, while lappend() is still pretty cheap;
on the downside, lcons() becomes O(N), as does insertion or deletion
in the middle of a list.  But we don't use lcons() very much
(and maybe a lot of the existing usages aren't really necessary?),
while insertion/deletion in long lists is a vanishingly infrequent
operation.  Meanwhile, making list_nth() cheap is a *huge* win.
 
The most critical thing that we lose by doing this is that when a
List is modified, all of its cells may need to move, which breaks
a lot of code that assumes it can insert or delete a cell while
hanging onto a pointer to a nearby cell.  In almost every case,
this takes the form of doing list insertions or deletions inside
a foreach() loop, and the pointer that's invalidated is the loop's
current-cell pointer.  Fortunately, with list_nth() now being so cheap,
we can replace these uses of foreach() with loops using an integer
index variable and fetching the next list element directly with
list_nth().  Most of these places were loops around list_delete_cell
calls, which I replaced with a new function list_delete_nth_cell
to go along with the emphasis on the integer index.
 
I don't claim to have found every case where that could happen,
although I added debug support in list.c to force list contents
to move on every list modification, and this patch does pass
check-world with that support turned on.  I fear that some such
bugs remain, though.
 
There is one big way in which I failed to preserve the old API
syntactically: lnext() now requires a pointer to the List as
well as the current ListCell, so that it can figure out where
the end of the cell array is.  That requires touching something
like 150 places that otherwise wouldn't have had to be touched,
which is annoying, even though most of those changes are trivial.
 
I thought about avoiding that by requiring Lists to keep a "sentinel"
value in the cell after the end of the active array, so that lnext()
could look for the sentinel to detect list end.  However, that idea
doesn't really work, because if the list array has been moved, the
spot where the sentinel had been could have been reallocated and
filled with something else.  So this'd offer no defense against the
possibility of a stale ListCell pointer, which is something that
we absolutely nee

Re: Re: parallel distinct union and aggregate support patch

2020-11-30 Thread bu...@sohu.com

> 1.
> +#define BATCH_SORT_MAX_BATCHES 512
>  
> Did you decide this number based on some experiment or is there some
> analysis behind selecting this number?
When there are too few batches, if a certain process works too slowly, it will 
cause unbalanced load.
When there are too many batches, FD will open and close files frequently.

> 2.
> +BatchSortState* ExecInitBatchSort(BatchSort *node, EState *estate, int 
> eflags)
> +{
> + BatchSortState *state;
> + TypeCacheEntry *typentry;
> 
> + for (i=0;inumGroupCols;++i)
> + {
> ...
> + InitFunctionCallInfoData(*fcinfo, flinfo, 1, attr->attcollation, NULL, 
> NULL);
> + fcinfo->args[0].isnull = false;
> + state->groupFuns = lappend(state->groupFuns, fcinfo);
> + }
>  
> From the variable naming, it appeared like the batch sort is dependent
> upon the grouping node.  I think instead of using the name
> numGroupCols and groupFuns we need to use names that are more relevant
> to the batch sort something like numSortKey.
Not all data types support both sorting and hashing calculations, such as 
user-defined data types.
We do not need all columns to support hash calculation when we batch, so I used 
two variables.

> 3.
> + if (eflags & (EXEC_FLAG_REWIND | EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))
> + {
> + /* for now, we only using in group aggregate */
> + ereport(ERROR,
> + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
> + errmsg("not support execute flag(s) %d for group sort", eflags)));
> + }
>  
> Instead of ereport, you should just put an Assert for the unsupported
> flag or elog.
In fact, this is an unfinished feature, BatchSort should also support these 
features, welcome to supplement.

> 4.
> + state = makeNode(BatchSortState);
> + state->ps.plan = (Plan*) node;
> + state->ps.state = estate;
> + state->ps.ExecProcNode = ExecBatchSortPrepare;
>  
> I think the main executor entry function should be named ExecBatchSort
> instead of ExecBatchSortPrepare, it will look more consistent with the
> other executor machinery.
The job of the ExecBatchSortPrepare function is to preprocess the data (batch 
and pre-sort),
and when its work ends, it will call "ExecSetExecProcNode(pstate, 
ExecBatchSort)" to return the data to the ExecBatchSort function.
There is another advantage of dividing into two functions, 
It is not necessary to judge whether tuplesort is now available every time the 
function is processed to improve the subtle performance.
And I think this code is clearer.



Re: Re: parallel distinct union and aggregate support patch

2020-10-29 Thread bu...@sohu.com
> 1) It's better to always include the whole patch series - including the
> parts that have not changed. Otherwise people have to scavenge the
> thread and search for all the pieces, which may be a source of issues.
> Also, it confuses the patch tester [1] which tries to apply patches from
> a single message, so it will fail for this one.
 Pathes 3 and 4 do not rely on 1 and 2 in code.
 But, it will fail when you apply the apatches 3 and 4 directly, because
 they are written after 1 and 2.
 I can generate a new single patch if you need.

> 2) I suggest you try to describe the goal of these patches, using some
> example queries, explain output etc. Right now the reviewers have to
> reverse engineer the patches and deduce what the intention was, which
> may be causing unnecessary confusion etc. If this was my patch, I'd try
> to create a couple examples (CREATE TABLE + SELECT + EXPLAIN) showing
> how the patch changes the query plan, showing speedup etc.
 I written some example queries in to regress, include "unique" "union"
 "group by" and "group by grouping sets".
 here is my tests, they are not in regress
```sql
begin;
create table gtest(id integer, txt text);
insert into gtest select t1.id,'txt'||t1.id from (select 
generate_series(1,1000*1000) id) t1,(select generate_series(1,10) id) t2;
analyze gtest;
commit;
set jit = off;
\timing on
```
normal aggregate times
```
set enable_batch_hashagg = off;
explain (costs off,analyze,verbose)
select sum(id),txt from gtest group by txt;
 QUERY PLAN
-
 Finalize GroupAggregate (actual time=6469.279..8947.024 rows=100 loops=1)
   Output: sum(id), txt
   Group Key: gtest.txt
   ->  Gather Merge (actual time=6469.245..8165.930 rows=158 loops=1)
 Output: txt, (PARTIAL sum(id))
 Workers Planned: 2
 Workers Launched: 2
 ->  Sort (actual time=6356.471..7133.832 rows=53 loops=3)
   Output: txt, (PARTIAL sum(id))
   Sort Key: gtest.txt
   Sort Method: external merge  Disk: 11608kB
   Worker 0:  actual time=6447.665..7349.431 rows=317512 loops=1
 Sort Method: external merge  Disk: 10576kB
   Worker 1:  actual time=6302.882..7061.157 rows=01 loops=1
 Sort Method: external merge  Disk: 2kB
   ->  Partial HashAggregate (actual time=2591.487..4430.437 
rows=53 loops=3)
 Output: txt, PARTIAL sum(id)
 Group Key: gtest.txt
 Batches: 17  Memory Usage: 4241kB  Disk Usage: 113152kB
 Worker 0:  actual time=2584.345..4486.407 rows=317512 
loops=1
   Batches: 17  Memory Usage: 4241kB  Disk Usage: 101392kB
 Worker 1:  actual time=2584.369..4393.244 rows=01 
loops=1
   Batches: 17  Memory Usage: 4241kB  Disk Usage: 112832kB
 ->  Parallel Seq Scan on public.gtest (actual 
time=0.691..603.990 rows=333 loops=3)
   Output: id, txt
   Worker 0:  actual time=0.104..607.146 rows=3174970 
loops=1
   Worker 1:  actual time=0.100..603.951 rows=3332785 
loops=1
 Planning Time: 0.226 ms
 Execution Time: 9021.058 ms
(29 rows)

Time: 9022.251 ms (00:09.022)

set enable_batch_hashagg = on;
explain (costs off,analyze,verbose)
select sum(id),txt from gtest group by txt;
   QUERY PLAN
-
 Gather (actual time=3116.666..5740.826 rows=100 loops=1)
   Output: (sum(id)), txt
   Workers Planned: 2
   Workers Launched: 2
   ->  Parallel BatchHashAggregate (actual time=3103.181..5464.948 rows=33 
loops=3)
 Output: sum(id), txt
 Group Key: gtest.txt
 Worker 0:  actual time=3094.550..5486.992 rows=326082 loops=1
 Worker 1:  actual time=3099.562..5480.111 rows=324729 loops=1
 ->  Parallel Seq Scan on public.gtest (actual time=0.791..656.601 
rows=333 loops=3)
   Output: id, txt
   Worker 0:  actual time=0.080..646.053 rows=3057680 loops=1
   Worker 1:  actual time=0.070..662.754 rows=3034370 loops=1
 Planning Time: 0.243 ms
 Execution Time: 5788.981 ms
(15 rows)

Time: 5790.143 ms (00:05.790)
```

grouping sets times
```
set enable_batch_hashagg = off;
explain (costs off,analyze,verbose)
select sum(id),txt from gtest group by grouping sets(id,txt,());
QUERY PLAN
--
 GroupAggregate (actual time=9454.707..38921.885 rows=201 loops=1)
   Output: sum(id), txt, id
   Group Key: gtest.id
   Group Key: ()
   Sort Key: gtest.txt
 

Re: parallel distinct union and aggregate support patch

2020-10-28 Thread bu...@sohu.com
Hi
Here is patch for parallel distinct union aggregate and grouping sets support 
using batch hash agg.
Please review.

how to use:
set enable_batch_hashagg = on

how to work:
like batch sort, but not sort each batch, just save hash value in each rows

unfinished work:
not support rescan yet. welcome to add. Actually I don't really understand how 
rescan works in parallel mode.

other:
patch 1 base on branch master(80f8eb79e24d9b7963eaf17ce846667e2c6b6e6f)
patch 1 and 2 see 
https://www.postgresql.org/message-id/2020101922424962544053%40sohu.com 
patch 3:
 extpand shared tuple store and add batch store module.
 By the way, use atomic operations instead LWLock for shared tuple store get 
next read page.
patch 4:
 using batch hash agg support parallels

 
发件人: bu...@sohu.com
发送时间: 2020-10-19 22:42
收件人: pgsql-hackers
主题: parallel distinct union and aggregate support patch
Hi hackers,
I write a path for soupport parallel distinct, union and aggregate using batch 
sort.
steps:
 1. generate hash value for group clauses values, and using mod hash value save 
to batch
 2. end of outer plan, wait all other workers finish write to batch
 3. echo worker get a unique batch number, call tuplesort_performsort() 
function finish this batch sort
 4. return row for this batch
 5. if not end of all batchs, got step 3

BatchSort paln make sure same tuple(group clause) return in same range, so 
Unique(or GroupAggregate) plan can work.

path 2 for parallel aggregate, this is a simple use
but regress failed for partitionwise aggregation difference plan
from GatherMerge->Sort->Append->...
to  Sort->Gahter->Append->...
I have no idea how to modify it.

Same idea I writed a batch shared tuple store for HashAgg in our PG version, I 
will send patch for PG14 when I finish it.


The following is a description in Chinese
英语不好,所以这里写点中文,希望上面写的不对的地方请大家帮忙纠正一下。
BatchSort的工作原理
 1. 先按group clause计算出hash值,并按取模的值放入不同的批次
 2. 当下层plan返回所有的行后,等待所有其它的工作进程结束
 3. 每一个工作进程索取一个唯一的一个批次, 并调用tuplesort_performsort()函数完成最终排序
 4. 返回本批次的所有行
 5. 如果所有的批次没有读完,则返回第3步
BatchSort plan能保证相同的数据(按分给表达式)在同一个周期内返回,所以几个去重和分组相关的plan可以正常工作。
第2个补丁是支持并行分组的,只做一次分组,而不是并行进程做每一次分组后,主进程再进行二次分组。
这个补丁导致了regress测试中的partitionwise aggregation失败,原来的执行计划有所变更。
补丁只写了一个简单的使用BatchSort plan的方法,可能还需要添加其它用法。

用同样的思想我写了一个使用shared tuple store的HashAgg在我们的AntDB版本中(最新版本暂未开源),适配完PG14版本后我会发出来。
打个广告:欢迎关注我们亚信公司基于PG的分布式数据库产品AntDB,开源地址 https://github.com/ADBSQL/AntDB


bu...@sohu.com


0003-extpand-shared-tuple-store-and-add-batch-store-modul.patch
Description: Binary data


0004-Parallel-distinct-union-aggregate-and-grouping-sets-.patch
Description: Binary data


Re: Re: parallel distinct union and aggregate support patch

2020-10-27 Thread bu...@sohu.com
> On Tue, Oct 27, 2020 at 3:27 PM Dilip Kumar  wrote:
> >
> > On Fri, Oct 23, 2020 at 11:58 AM bu...@sohu.com  wrote:
> > >
> > > > Interesting idea.  So IIUC, whenever a worker is scanning the tuple it
> > > > will directly put it into the respective batch(shared tuple store),
> > > > based on the hash on grouping column and once all the workers are
> > > > doing preparing the batch then each worker will pick those baches one
> > > > by one, perform sort and finish the aggregation.  I think there is a
> > > > scope of improvement that instead of directly putting the tuple to the
> > > > batch what if the worker does the partial aggregations and then it
> > > > places the partially aggregated rows in the shared tuple store based
> > > > on the hash value and then the worker can pick the batch by batch.  By
> > > > doing this way, we can avoid doing large sorts.  And then this
> > > > approach can also be used with the hash aggregate, I mean the
> > > > partially aggregated data by the hash aggregate can be put into the
> > > > respective batch.
> > >
> > > Good idea. Batch sort suitable for large aggregate result rows,
> > > in large aggregate result using partial aggregation maybe out of memory,
> > > and all aggregate functions must support partial(using batch sort this is 
> > > unnecessary).
> > >
> > > Actually i written a batch hash store for hash aggregate(for pg11) like 
> > > this idea,
> > > but not write partial aggregations to shared tuple store, it's write 
> > > origin tuple and hash value
> > > to shared tuple store, But it's not support parallel grouping sets.
> > > I'am trying to write parallel hash aggregate support using batch shared 
> > > tuple store for PG14,
> > > and need support parallel grouping sets hash aggregate.
> >
> > I was trying to look into this patch to understand the logic in more
> > detail.  Actually, there are no comments at all so it's really hard to
> > understand what the code is trying to do.
> >
> > I was reading the below functions, which is the main entry point for
> > the batch sort.
> >
> > +static TupleTableSlot *ExecBatchSortPrepare(PlanState *pstate)
> > +{
> > ...
> > + for (;;)
> > + {
> > ...
> > + tuplesort_puttupleslot(state->batches[hash%node->numBatches], slot);
> > + }
> > +
> > + for (i=node->numBatches;i>0;)
> > + tuplesort_performsort(state->batches[--i]);
> > +build_already_done_:
> > + if (parallel)
> > + {
> > + for (i=node->numBatches;i>0;)
> > + {
> > + --i;
> > + if (state->batches[i])
> > + {
> > + tuplesort_end(state->batches[i]);
> > + state->batches[i] = NULL;
> > + }
> > + }
> >
> > I did not understand this part, that once each worker has performed
> > their local batch-wise sort why we are clearing the baches?  I mean
> > individual workers have their on batches so eventually they supposed
> > to get merged.  Can you explain this part and also it will be better
> > if you can add the comments.
>  
> I think I got this,  IIUC, each worker is initializing the shared
> short and performing the batch-wise sorting and we will wait on a
> barrier so that all the workers can finish with their sorting.  Once
> that is done the workers will coordinate and pick the batch by batch
> and perform the final merge for the batch.

Yes, it is. Each worker open the shared sort as "worker" (nodeBatchSort.c:134),
end of all worker performing, pick one batch and open it as 
"leader"(nodeBatchSort.c:54).



Re: Re: parallel distinct union and aggregate support patch

2020-10-23 Thread bu...@sohu.com
> Interesting idea.  So IIUC, whenever a worker is scanning the tuple it
> will directly put it into the respective batch(shared tuple store),
> based on the hash on grouping column and once all the workers are
> doing preparing the batch then each worker will pick those baches one
> by one, perform sort and finish the aggregation.  I think there is a
> scope of improvement that instead of directly putting the tuple to the
> batch what if the worker does the partial aggregations and then it
> places the partially aggregated rows in the shared tuple store based
> on the hash value and then the worker can pick the batch by batch.  By
> doing this way, we can avoid doing large sorts.  And then this
> approach can also be used with the hash aggregate, I mean the
> partially aggregated data by the hash aggregate can be put into the
> respective batch.

Good idea. Batch sort suitable for large aggregate result rows,
in large aggregate result using partial aggregation maybe out of memory,
and all aggregate functions must support partial(using batch sort this is 
unnecessary).

Actually i written a batch hash store for hash aggregate(for pg11) like this 
idea,
but not write partial aggregations to shared tuple store, it's write origin 
tuple and hash value
to shared tuple store, But it's not support parallel grouping sets.
I'am trying to write parallel hash aggregate support using batch shared tuple 
store for PG14,
and need support parallel grouping sets hash aggregate.


Re: Re: parallel distinct union and aggregate support patch

2020-10-22 Thread bu...@sohu.com
> If I understood correctly, the tuples emitted by Parallel Batch Sort
> in each process are ordered by (hash(key, ...) % npartitions, key,
> ...), but the path is claiming to be ordered by (key, ...), no?
> That's enough for Unique and Aggregate to give the correct answer,
> because they really only require equal keys to be consecutive (and in
> the same process), but maybe some other plan could break?

The path not claiming to be ordered by (key, ...), the path save PathKey(s) in 
BatchSortPath::batchkeys, not Path::pathkeys.
I don't understand "but maybe some other plan could break", mean some on path 
using this path? no, BathSortPath on for some special path(Unique, GroupAgg 
...).



bu...@sohu.com
 
From: Thomas Munro
Date: 2020-10-21 12:27
To: bu...@sohu.com
CC: pgsql-hackers
Subject: Re: parallel distinct union and aggregate support patch
On Tue, Oct 20, 2020 at 3:49 AM bu...@sohu.com  wrote:
> I write a path for soupport parallel distinct, union and aggregate using 
> batch sort.
> steps:
>  1. generate hash value for group clauses values, and using mod hash value 
> save to batch
>  2. end of outer plan, wait all other workers finish write to batch
>  3. echo worker get a unique batch number, call tuplesort_performsort() 
> function finish this batch sort
>  4. return row for this batch
>  5. if not end of all batchs, got step 3
>
> BatchSort paln make sure same tuple(group clause) return in same range, so 
> Unique(or GroupAggregate) plan can work.
 
Hi!
 
Interesting work!  In the past a few people have speculated about a
Parallel Repartition operator that could partition tuples a bit like
this, so that each process gets a different set of partitions.  Here
you combine that with a sort.  By doing both things in one node, you
avoid a lot of overheads (writing into a tuplestore once in the
repartitioning node, and then once again in the sort node, with tuples
being copied one-by-one between the two nodes).
 
If I understood correctly, the tuples emitted by Parallel Batch Sort
in each process are ordered by (hash(key, ...) % npartitions, key,
...), but the path is claiming to be ordered by (key, ...), no?
That's enough for Unique and Aggregate to give the correct answer,
because they really only require equal keys to be consecutive (and in
the same process), but maybe some other plan could break?


parallel distinct union and aggregate support patch

2020-10-19 Thread bu...@sohu.com
Hi hackers,
I write a path for soupport parallel distinct, union and aggregate using batch 
sort.
steps:
 1. generate hash value for group clauses values, and using mod hash value save 
to batch
 2. end of outer plan, wait all other workers finish write to batch
 3. echo worker get a unique batch number, call tuplesort_performsort() 
function finish this batch sort
 4. return row for this batch
 5. if not end of all batchs, got step 3

BatchSort paln make sure same tuple(group clause) return in same range, so 
Unique(or GroupAggregate) plan can work.

path 2 for parallel aggregate, this is a simple use
but regress failed for partitionwise aggregation difference plan
from GatherMerge->Sort->Append->...
to  Sort->Gahter->Append->...
I have no idea how to modify it.

Same idea I writed a batch shared tuple store for HashAgg in our PG version, I 
will send patch for PG14 when I finish it.


The following is a description in Chinese
英语不好,所以这里写点中文,希望上面写的不对的地方请大家帮忙纠正一下。
BatchSort的工作原理
 1. 先按group clause计算出hash值,并按取模的值放入不同的批次
 2. 当下层plan返回所有的行后,等待所有其它的工作进程结束
 3. 每一个工作进程索取一个唯一的一个批次, 并调用tuplesort_performsort()函数完成最终排序
 4. 返回本批次的所有行
 5. 如果所有的批次没有读完,则返回第3步
BatchSort plan能保证相同的数据(按分给表达式)在同一个周期内返回,所以几个去重和分组相关的plan可以正常工作。
第2个补丁是支持并行分组的,只做一次分组,而不是并行进程做每一次分组后,主进程再进行二次分组。
这个补丁导致了regress测试中的partitionwise aggregation失败,原来的执行计划有所变更。
补丁只写了一个简单的使用BatchSort plan的方法,可能还需要添加其它用法。

用同样的思想我写了一个使用shared tuple store的HashAgg在我们的AntDB版本中(最新版本暂未开源),适配完PG14版本后我会发出来。
打个广告:欢迎关注我们亚信公司基于PG的分布式数据库产品AntDB,开源地址 https://github.com/ADBSQL/AntDB


bu...@sohu.com


0001-Parallel-distinct-and-union-support.patch
Description: Binary data


0002-Parallel-aggregate-support-using-batch-sort.patch
Description: Binary data