On 11.11.2017 23:29, Konstantin Knizhnik wrote:
On 10/27/2017 02:01 PM, Jeevan Chalke wrote:
Hi,
Attached new patch-set here. Changes include:
1. Added separate patch for costing Append node as discussed up-front
in the
patch-set.
2. Since we now cost Append node, we don't need
partition_wise_agg_cost_factor
GUC. So removed that. The remaining patch hence merged into main
implementation
patch.
3. Updated rows in test-cases so that we will get partition-wise plans.
Thanks
I applied partition-wise-agg-v6.tar.gz patch to the master and use
shard.sh example from
https://www.postgresql.org/message-id/14577.1509723225%40localhost
Plan for count(*) is the following:
shard=# explain select count(*) from orders;
QUERY PLAN
---------------------------------------------------------------------------------------
Finalize Aggregate (cost=100415.29..100415.30 rows=1 width=8)
-> Append (cost=50207.63..100415.29 rows=2 width=8)
-> Partial Aggregate (cost=50207.63..50207.64 rows=1 width=8)
-> Foreign Scan on orders_0 (cost=101.00..50195.13
rows=5000 width=0)
-> Partial Aggregate (cost=50207.63..50207.64 rows=1 width=8)
-> Foreign Scan on orders_1 (cost=101.00..50195.13
rows=5000 width=0)
We really calculate partial aggregate for each partition, but to do we
still have to fetch all data from remote host.
So for foreign partitions such plans is absolutely inefficient.
Amy be it should be combined with some other patch?
For example, with agg_pushdown_v4.tgz patch
https://www.postgresql.org/message-id/14577.1509723225%40localhost ?
But it is not applied after partition-wise-agg-v6.tar.gz patch.
Also postgres_fdw in 11dev is able to push down aggregates without
agg_pushdown_v4.tgz patch.
In 0009-Teach-postgres_fdw-to-push-aggregates-for-child-rela.patch
there is the following check:
/* Partial aggregates are not supported. */
+ if (extra->isPartial)
+ return;
If we just comment this line then produced plan will be the following:
shard=# explain select sum(product_id) from orders;
QUERY PLAN
----------------------------------------------------------------
Finalize Aggregate (cost=308.41..308.42 rows=1 width=8)
-> Append (cost=144.18..308.41 rows=2 width=8)
-> Foreign Scan (cost=144.18..154.20 rows=1 width=8)
Relations: Aggregate on (public.orders_0 orders)
-> Foreign Scan (cost=144.18..154.20 rows=1 width=8)
Relations: Aggregate on (public.orders_1 orders)
(6 rows)
And it is actually desired plan!
Obviously such approach will not always work. FDW really doesn't
support partial aggregates now.
But for most frequently used aggregates: sum, min, max, count
aggtype==aggtranstype and there is no difference
between partial and normal aggregate calculation.
So instead of (extra->isPartial) condition we can add more complex
check which will traverse pathtarget expressions and
check if it can be evaluated in this way. Or... extend FDW API to
support partial aggregation.
But even the last plan is not ideal: it will calculate predicates at
each remote node sequentially.
There is parallel append patch:
https://www.postgresql.org/message-id/CAJ3gD9ctEcrVUmpY6fq_JUB6WDKGXAGd70EY68jVFA4kxMbKeQ%40mail.gmail.com
but ... FDW doesn't support parallel scan, so parallel append can not
be applied in this case.
And we actually do not need parallel append with all its dynamic
workers here.
We just need to start commands at all remote servers and only after it
fetch results (which can be done sequentially).
I am investigating problem of efficient execution of OLAP queries on
sharded tables (tables with remote partitions).
After reading all this threads and corresponding patches, it seems to me
that we already have most of parts of the puzzle, what we need is to
put them on right places and may be add missed ones.
I wonder if somebody is busy with it and can I somehow help here?
Also I am not quite sure about the best approach with parallel
execution of distributed query at all nodes.
Should we make postgres_fdw parallel safe and use parallel append? How
difficult it will be?
Or in addition to parallel append we should also have "asynchronous
append" which will be able to initiate execution at all nodes?
It seems to be close to merge append, because it should simultaneously
traverse all cursors.
Looks like second approach is easier for implementation. But in case
of sharded table, distributed query may need to traverse both remote
and local shards and this approach doesn't allow to processed several
local shards in parallel.
I attach small patch for postgres_fdw.c which allows concurrent
execution of aggregates by all remote servers (when them are accessed
through postgres_fdw).
I have added "postgres_fdw.use_prefetch" GUC to enable/disable
prefetching data in postgres_fdw.
This patch should be applied after of partition-wise-agg-v6.tar.gz patch.
With shard example and the following two GUCs set:
shard=# set postgres_fdw.use_prefetch=on;
shard=# set enable_partition_wise_agg=on;
shard=# select sum(product_id) from orders;
sum
---------
9965891
(1 row)
shard=# explain select sum(product_id) from orders;
QUERY PLAN
----------------------------------------------------------------
Finalize Aggregate (cost=308.41..308.42 rows=1 width=8)
-> Append (cost=144.18..308.41 rows=2 width=8)
-> Foreign Scan (cost=144.18..154.20 rows=1 width=8)
Relations: Aggregate on (public.orders_0 orders)
-> Foreign Scan (cost=144.18..154.20 rows=1 width=8)
Relations: Aggregate on (public.orders_1 orders)
(6 rows)
sum aggregate is calculated in parallel by both servers.
I have not tested it much, it is just prove of concept.
--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
270a271,272
> void _PG_init(void);
>
372a375
> static void prefetch_more_data(ForeignScanState *node);
425a429,430
> static bool fdw_prefetch_data;
>
1377a1383,1387
> create_cursor(node);
> if (fdw_prefetch_data)
> {
> prefetch_more_data(node);
> }
2989a3000,3010
> static void
> prefetch_more_data(ForeignScanState *node)
> {
> PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
> char sql[64];
> snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
> fsstate->fetch_size, fsstate->cursor_number);
> if (!PQsendQuery(fsstate->conn, sql))
> pgfdw_report_error(ERROR, NULL, fsstate->conn, false, sql);
> }
>
3019c3040,3042
< res = pgfdw_exec_query(conn, sql);
---
> res = fdw_prefetch_data
> ? pgfdw_get_result(conn, sql)
> : pgfdw_exec_query(conn, sql);
3049d3071
<
3051a3074,3075
> if (!fsstate->eof_reached)
> prefetch_more_data(node);
4836a4861,4879
>
> static bool
> contains_complex_aggregate(Node *node, void *context)
> {
> if (node == NULL)
> return false;
>
> if (IsA(node, Aggref))
> {
> Aggref* agg = (Aggref*)node;
> return agg->aggtranstype != agg->aggtype;
> }
>
> return expression_tree_walker(node,
> contains_complex_aggregate,
> context);
> }
>
>
4866c4909
< if (extra->isPartial)
---
> if (extra->isPartial && expression_tree_walker((Node*)extra->pathTarget->exprs, contains_complex_aggregate, NULL))
5190a5234,5246
> }
>
> void _PG_init(void)
> {
> DefineCustomBoolVariable(
> "postgres_fdw.use_prefetch",
> "Prefetch data from cursor",
> NULL,
> &fdw_prefetch_data,
> false,
> PGC_SUSET,
> 0,
> NULL, NULL, NULL);
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers