Mostafa Mokhtar created IMPALA-6746:
---------------------------------------

             Summary: Reduce the number of comparison for analytical functions 
with partitioning when incoming data is clustered
                 Key: IMPALA-6746
                 URL: https://issues.apache.org/jira/browse/IMPALA-6746
             Project: IMPALA
          Issue Type: Improvement
          Components: Backend
    Affects Versions: Impala 2.13.0
            Reporter: Mostafa Mokhtar
            Assignee: Tianyi Wang
         Attachments: percentile query profile 2.txt

Checking if the current row belongs to the same partition in ANALYTIC is very 
expensive, as it does N comparisons where N is number of rows, in cases when 
the cardinality of the partition column(s) is relatively small the values will 
be clustered.
One optimization as proposed by [~alex.behm] is to check the first and last 
tuples in the batch and if they match go avoid calling 
AnalyticEvalNode::PrevRowCompare for the entire batch.

For the query attached which is a common pattern the expected speedup is 20-30%.

Query
{code}
select l_commitdate
    ,avg(l_extendedprice) as avg_perc
    ,percentile_cont (.25) within group (order by l_extendedprice asc) as 
perc_25
    ,percentile_cont (.5) within group (order by l_extendedprice asc) as perc_50
    ,percentile_cont (.75) within group (order by l_extendedprice asc) as 
perc_75
    ,percentile_cont (.90) within group (order by l_extendedprice asc) as 
perc_90
from lineitem
group by l_commitdate
order by l_commitdate
{code}

Plan
{code}
F03:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
|  Per-Host Resources: mem-estimate=0B mem-reservation=0B
PLAN-ROOT SINK
|  mem-estimate=0B mem-reservation=0B
|
09:MERGING-EXCHANGE [UNPARTITIONED]
|  order by: l_commitdate ASC
|  mem-estimate=0B mem-reservation=0B
|  tuple-ids=5 row-size=66B cardinality=2559
|
F02:PLAN FRAGMENT [HASH(l_commitdate)] hosts=1 instances=1
Per-Host Resources: mem-estimate=22.00MB mem-reservation=13.94MB
05:SORT
|  order by: l_commitdate ASC
|  mem-estimate=12.00MB mem-reservation=12.00MB spill-buffer=2.00MB
|  tuple-ids=5 row-size=66B cardinality=2559
|
08:AGGREGATE [FINALIZE]
|  output: avg:merge(l_extendedprice), 
_percentile_cont_interpolation:merge(l_extendedprice, 
`_percentile_row_number_diff_0`), 
_percentile_cont_interpolation:merge(l_extendedprice, 
`_percentile_row_number_diff_1`), 
_percentile_cont_interpolation:merge(l_extendedprice, 
`_percentile_row_number_diff_2`), 
_percentile_cont_interpolation:merge(l_extendedprice, 
`_percentile_row_number_diff_3`)
|  group by: l_commitdate
|  mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB
|  tuple-ids=4 row-size=66B cardinality=2559
|
07:EXCHANGE [HASH(l_commitdate)]
|  mem-estimate=0B mem-reservation=0B
|  tuple-ids=3 row-size=66B cardinality=2559
|
F01:PLAN FRAGMENT [HASH(l_commitdate)] hosts=1 instances=1
Per-Host Resources: mem-estimate=64.00MB mem-reservation=22.00MB
04:AGGREGATE [STREAMING]
|  output: avg(l_extendedprice), 
_percentile_cont_interpolation(l_extendedprice, row_number() - 1 - 
count(l_extendedprice) - 1 * 0.25), 
_percentile_cont_interpolation(l_extendedprice, row_number() - 1 - 
count(l_extendedprice) - 1 * 0.5), 
_percentile_cont_interpolation(l_extendedprice, row_number() - 1 - 
count(l_extendedprice) - 1 * 0.75), 
_percentile_cont_interpolation(l_extendedprice, row_number() - 1 - 
count(l_extendedprice) - 1 * 0.90)
|  group by: l_commitdate
|  mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB
|  tuple-ids=3 row-size=66B cardinality=2559
|
03:ANALYTIC
|  functions: count(l_extendedprice)
|  partition by: l_commitdate
|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB
|  tuple-ids=9,7,8 row-size=50B cardinality=59986052
|
02:ANALYTIC
|  functions: row_number()
|  partition by: l_commitdate
|  order by: l_extendedprice ASC
|  window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB
|  tuple-ids=9,7 row-size=42B cardinality=59986052
|
01:SORT
|  order by: l_commitdate ASC NULLS FIRST, l_extendedprice ASC NULLS LAST
|  mem-estimate=46.00MB mem-reservation=12.00MB spill-buffer=2.00MB
|  tuple-ids=9 row-size=34B cardinality=59986052
|
06:EXCHANGE [HASH(l_commitdate)]
|  mem-estimate=0B mem-reservation=0B
|  tuple-ids=0 row-size=34B cardinality=59986052
|
F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
Per-Host Resources: mem-estimate=88.00MB mem-reservation=0B
00:SCAN HDFS [tpch_10_parquet.lineitem, RANDOM]
   partitions=1/1 files=15 size=2.05GB
   stored statistics:
     table: rows=59986052 size=2.05GB
     columns: all
   extrapolated-rows=disabled
   mem-estimate=88.00MB mem-reservation=0B
   tuple-ids=0 row-size=34B cardinality=59986052
{code}

Call stack
{code}
libc.so.6!__memcmp_sse4_1 - memcmp-sse4.S
impalad!StringCompare+0x14 - string-value.inline.h:40
impalad!impala::StringValue::Eq+0x23 - string-value.inline.h:62
impalad!impala::StringValue::operator==+0 - string-value.inline.h:66
impalad!impala::Operators::Eq_StringVal_StringVal+0xd - operators-ir.cc:227
impalad!impala::ScalarFnCall::InterpretEval<impala_udf::BooleanVal>+0x597 - 
scalar-fn-call.cc:485
impalad!impala::ScalarFnCall::GetBooleanVal+0x24 - scalar-fn-call.cc:536
impalad!impala::AndPredicate::GetBooleanVal+0x4d - compound-predicates.cc:36
impalad!impala::OrPredicate::GetBooleanVal+0x4d - compound-predicates.cc:56
impalad!impala::AndPredicate::GetBooleanVal+0x29 - compound-predicates.cc:33
impalad!impala::ScalarExprEvaluator::GetBooleanVal+0x16 - 
scalar-expr-evaluator.cc:368
impalad!impala::AnalyticEvalNode::PrevRowCompare+0xb - analytic-eval-node.cc:591
impalad!impala::AnalyticEvalNode::ProcessChildBatch+0x227 - 
analytic-eval-node.cc:644
impalad!impala::AnalyticEvalNode::ProcessChildBatches+0xf5 - 
analytic-eval-node.cc:604
impalad!impala::AnalyticEvalNode::GetNext+0x269 - analytic-eval-node.cc:786
impalad!impala::AnalyticEvalNode::ProcessChildBatches+0xbf - 
analytic-eval-node.cc:602
impalad!impala::AnalyticEvalNode::GetNext+0x269 - analytic-eval-node.cc:786
impalad!impala::PartitionedAggregationNode::GetRowsStreaming+0xa6 - 
partitioned-aggregation-node.cc:478
impalad!impala::PartitionedAggregationNode::GetNext+0x221 - 
partitioned-aggregation-node.cc:369
impalad!impala::FragmentInstanceState::ExecInternal+0x1b1 - 
fragment-instance-state.cc:277
impalad!impala::FragmentInstanceState::Exec+0x29e - 
fragment-instance-state.cc:89
impalad!impala::QueryState::ExecFInstance+0x249 - query-state.cc:394
impalad!boost::function0<void>::operator()+0x1a - function_template.hpp:767
impalad!impala::Thread::SuperviseThread+0x2e4 - thread.cc:356
impalad!operator()<void (*)(const std::basic_string<char>&, const 
std::basic_string<char>&, boost::function<void()>, const 
impala::ThreadDebugInfo*, impala::Promise<long int>*), boost::_bi::list0>+0x5b 
- bind.hpp:525
impalad!boost::_bi::bind_t<void, void (*)(std::string const&, std::string 
const&, boost::function<void (void)>, impala::ThreadDebugInfo const*, 
impala::Promise<long>*), boost::_bi::list5<boost::_bi::value<std::string>, 
boost::_bi::value<std::string>, boost::_bi::value<boost::function<void 
(void)>>, boost::_bi::value<impala::ThreadDebugInfo*>, 
boost::_bi::value<impala::Promise<long>*>>>::operator()+0 - bind_template.hpp:20
impalad!boost::detail::thread_data<boost::_bi::bind_t<void, void 
(*)(std::string const&, std::string const&, boost::function<void (void)>, 
impala::ThreadDebugInfo const*, impala::Promise<long>*), 
boost::_bi::list5<boost::_bi::value<std::string>, 
boost::_bi::value<std::string>, boost::_bi::value<boost::function<void 
(void)>>, boost::_bi::value<impala::ThreadDebugInfo*>, 
boost::_bi::value<impala::Promise<long>*>>>>::run+0x1e - thread.hpp:116
impalad!thread_proxy+0xd9 - [Unknown]:[Unknown]
libpthread.so.0!start_thread+0xc1 - pthread_create.c:312
libc.so.6!__clone+0x6c - clone.S:111
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to