Sailesh Mukil has posted comments on this change.

Change subject: IMPALA-4856: Port data stream service to KRPC
......................................................................


Patch Set 1:

(12 comments)

> This patch passes core, exhaustive and ASAN tests. It can execute
 > 32 concurrent streams of TPCDS-Q17 @ scale factor 30000 on a
 > 138-node cluster with Kerberos enabled. (I don't believe the
 > previous implementation could do this effectively because of the
 > number of Thrift connections required).
 > 
 > Some perf results from a 20-node cluster:
 > 
 > +------------+----------+-----------------------+--------+-------------+------------+-----------+----------------+-------------+-------+
 > | Workload   | Query    | File Format           | Avg(s) | Base
 > Avg(s) | Delta(Avg) | StdDev(%) | Base StdDev(%) | Num Clients |
 > Iters |
 > +------------+----------+-----------------------+--------+-------------+------------+-----------+----------------+-------------+-------+
 > | TPCH(_300) | TPCH-Q3  | parquet / none / none | 32.55  | 28.18   
 >    |   +15.51%  |   4.71%   |   1.17%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q13 | parquet / none / none | 24.43  | 22.21   
 >    |   +9.99%   |   0.61%   |   0.70%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q8  | parquet / none / none | 7.53   | 7.05    
 >    |   +6.69%   |   1.70%   |   2.09%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q22 | parquet / none / none | 6.35   | 6.04    
 >    |   +5.19%   |   0.37%   |   0.76%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q14 | parquet / none / none | 4.28   | 4.10    
 >    |   +4.36%   |   0.03%   |   0.73%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q15 | parquet / none / none | 3.53   | 3.41    
 >    |   +3.69%   |   0.61%   |   1.42%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q16 | parquet / none / none | 6.09   | 5.87    
 >    |   +3.63%   |   0.15%   |   1.78%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q11 | parquet / none / none | 1.73   | 1.70    
 >    |   +2.22%   |   0.10%   |   0.95%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q21 | parquet / none / none | 105.84 | 103.71  
 >    |   +2.06%   |   0.57%   |   0.44%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q9  | parquet / none / none | 30.76  | 30.46   
 >    |   +1.00%   |   2.57%   |   1.22%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q1  | parquet / none / none | 22.14  | 21.94   
 >    |   +0.91%   |   0.81%   |   0.86%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q4  | parquet / none / none | 5.09   | 5.05    
 >    |   +0.79%   |   0.48%   |   2.54%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q18 | parquet / none / none | 31.76  | 32.54   
 >    |   -2.39%   |   0.44%   |   0.03%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q2  | parquet / none / none | 1.98   | 2.04    
 >    |   -2.74%   |   7.17%   |   7.41%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q5  | parquet / none / none | 47.62  | 48.98   
 >    |   -2.79%   |   0.51%   |   0.16%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q20 | parquet / none / none | 3.18   | 3.27    
 >    |   -2.89%   |   1.34%   |   1.98%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q6  | parquet / none / none | 1.32   | 1.37    
 >    |   -3.72%   |   0.03%   |   4.00%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q10 | parquet / none / none | 9.00   | 9.48    
 >    |   -5.06%   |   0.16%   |   0.69%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q17 | parquet / none / none | 5.16   | 5.75    
 >    |   -10.18%  |   6.44%   |   2.63%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q12 | parquet / none / none | 3.01   | 3.39    
 >    |   -11.38%  |   2.43%   |   0.06%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q19 | parquet / none / none | 25.20  | 28.82   
 >    | I -12.57%  |   0.01%   |   0.75%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q7  | parquet / none / none | 45.32  | 61.16   
 >    | I -25.91%  |   0.55%   |   2.22%        | 1           | 3    
 > |
 > +------------+----------+-----------------------+--------+-------------+------------+-----------+----------------+-------------+-------+
 > 
 > Primitives (note the significant regression in many_independent_fragments,
 > that needs further attention)
 > 
 > +---------------------+--------------------------------------------------------+-----------------------+--------+-------------+------------+-----------+----------------+-------------+-------+
 > | Workload            | Query                                      
 >            | File Format           | Avg(s) | Base Avg(s) |
 > Delta(Avg) | StdDev(%) | Base StdDev(%) | Num Clients | Iters |
 > +---------------------+--------------------------------------------------------+-----------------------+--------+-------------+------------+-----------+----------------+-------------+-------+
 > | TARGETED-PERF(_300) | primitive_many_independent_fragments       
 >            | parquet / none / none | 377.69 | 189.40      | R
 > +99.42%  |   0.32%   |   0.22%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_filter_bigint_in_list            
 >            | parquet / none / none | 0.95   | 0.89        |  
 > +6.16%   |   0.01%   |   0.44%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_many_fragments                   
 >            | parquet / none / none | 60.87  | 57.83       |  
 > +5.25%   |   1.00%   |   0.54%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_groupby_bigint_highndv           
 >            | parquet / none / none | 25.53  | 24.69       |  
 > +3.40%   |   1.30%   |   0.92%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_top-n_all                        
 >            | parquet / none / none | 40.68  | 39.36       |  
 > +3.35%   |   0.75%   |   1.49%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_exchange_broadcast               
 >            | parquet / none / none | 84.74  | 82.23       |  
 > +3.05%   |   5.18%   |   0.13%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_intrinsic_appx_median            
 >            | parquet / none / none | 35.04  | 34.21       |  
 > +2.42%   |   1.01%   |   0.96%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_filter_string_like               
 >            | parquet / none / none | 5.48   | 5.38        |  
 > +1.74%   |   0.94%   |   0.93%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_groupby_bigint_pk                
 >            | parquet / none / none | 95.74  | 94.35       |  
 > +1.47%   |   0.82%   |   2.74%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_decimal_arithmetic               
 >            | parquet / none / none | 100.51 | 99.40       |  
 > +1.12%   |   0.72%   |   1.62%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_conjunct_ordering_1              
 >            | parquet / none / none | 4.73   | 4.68        |  
 > +1.04%   |   0.04%   |   1.12%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_groupby_bigint_lowndv            
 >            | parquet / none / none | 3.34   | 3.32        |  
 > +0.48%   |   2.30%   |   0.00%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_orderby_bigint_expression        
 >            | parquet / none / none | 18.49  | 18.42       |  
 > +0.41%   |   2.91%   |   4.33%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_filter_decimal_selective         
 >            | parquet / none / none | 0.60   | 0.60        |  
 > +0.22%   |   0.27%   |   0.35%        | 1           | 3     |
 > | TARGETED-PERF(_300) | 
 > primitive_shuffle_join_one_to_many_string_with_groupby
 > | parquet / none / none | 240.84 | 241.03      |   -0.08%   |  
 > 0.46%   |   0.44%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_conjunct_ordering_3              
 >            | parquet / none / none | 1.02   | 1.02        |  
 > -0.35%   |   0.07%   |   0.34%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_orderby_all                      
 >            | parquet / none / none | 54.61  | 54.84       |  
 > -0.42%   |   1.48%   |   1.04%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_empty_build_join_1               
 >            | parquet / none / none | 6.61   | 6.65        |  
 > -0.72%   |   0.02%   |   0.78%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_conjunct_ordering_4              
 >            | parquet / none / none | 0.86   | 0.88        |  
 > -1.76%   |   0.28%   |   3.07%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_groupby_decimal_lowndv.test      
 >            | parquet / none / none | 3.39   | 3.47        |  
 > -2.24%   |   2.29%   |   1.41%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_intrinsic_to_date                
 >            | parquet / none / none | 79.05  | 81.70       |  
 > -3.24%   |   0.42%   |   0.36%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_filter_string_selective          
 >            | parquet / none / none | 0.53   | 0.55        |  
 > -4.82%   |   4.22%   |   0.39%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_broadcast_join_2                 
 >            | parquet / none / none | 4.07   | 4.36        |  
 > -6.68%   |   0.66%   |   0.45%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_broadcast_join_3                 
 >            | parquet / none / none | 49.97  | 53.78       |  
 > -7.09%   |   0.45%   |   0.45%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_topn_bigint                      
 >            | parquet / none / none | 5.13   | 5.52        |  
 > -7.12%   |   0.68%   |   0.06%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_filter_bigint_selective          
 >            | parquet / none / none | 0.37   | 0.40        |  
 > -7.23%   |   7.01%   |   0.28%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_broadcast_join_1                 
 >            | parquet / none / none | 0.96   | 1.04        |  
 > -7.25%   |   0.08%   |   7.60%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_filter_string_non_selective      
 >            | parquet / none / none | 0.90   | 0.98        |  
 > -7.67%   |   5.27%   |   2.36%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_filter_decimal_non_selective     
 >            | parquet / none / none | 0.85   | 0.93        |  
 > -8.51%   |   0.06%   |   2.42%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_orderby_bigint                   
 >            | parquet / none / none | 14.56  | 16.13       | I
 > -9.72%   |   0.44%   |   0.12%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_conjunct_ordering_2              
 >            | parquet / none / none | 24.32  | 27.75       | I
 > -12.36%  |   0.28%   |   0.63%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_groupby_decimal_highndv          
 >            | parquet / none / none | 21.17  | 24.49       |  
 > -13.58%  |   0.35%   |   1.96%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_exchange_shuffle                 
 >            | parquet / none / none | 69.10  | 80.03       |  
 > -13.66%  |   0.10%   |   1.22%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_shuffle_join_union_all_with_groupby
 >          | parquet / none / none | 56.55  | 67.01       | I -15.61%
 >  |   1.06%   |   0.23%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_conjunct_ordering_5              
 >            | parquet / none / none | 28.41  | 35.20       |  
 > -19.28%  |   5.12%   |   6.49%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_filter_bigint_non_selective      
 >            | parquet / none / none | 1.08   | 1.65        | I
 > -34.87%  |   7.17%   |   3.12%        | 1           | 3     |
 > +---------------------+--------------------------------------------------------+-----------------------+--------+-------------+------------+-----------+----------------+-------------+-------+

Did a first pass.

Just clarifying, but you mentioned that Kerberos isn't supported in the commit 
message, but your benchmark was with kerberos? Did you run it with a security 
patch to enable kerberos?

http://gerrit.cloudera.org:8080/#/c/7103/1/be/src/runtime/data-stream-mgr.cc
File be/src/runtime/data-stream-mgr.cc:

Line 61:           bool unused = false;
Doesn't the contract for FindRecvr() state that we need to hold 'lock_' before 
we call it?


Line 105:   EarlySendersList waiters;
Add brief comment:
"Process payloads of early senders for this receiver now that it's created" or 
something similar.


PS1, Line 123: for (int32_t sender_id: waiters.closing_senders) 
recvr->RemoveSender(sender_id);
According to the header comment in data-stream-mgr.h, a sender shouldn't be in 
'closing_senders' if that same sender has a payload in waiting in 
'early_senders', ensuring that we process all payloads through AddBatch() 
before calling CloseSender().

This is unless the sender got closed or cancelled and doesn't care about the 
previous TransmitData() payloads, presumably because the query failed or got 
cancelled.

Given that, wouldn't it be better to call RemoveSender() first before calling 
EnqueueRowBatch()? We don't want to process data if the query is being 
cancelled anyway.


PS1, Line 290: senders
Assign an appropriate name like "timed_out_senders" or something similar.


PS1, Line 300: early_senders_
Assume the following case:

- This contained closed_senders that are erased from here since the receiver 
never got created in time.
- Next, the ExchangeNode finally comes around and creates the receiver, and 
calls GetBatch().

It would hang indefinitely in GetBatch() at data_arrival_cv_.wait() right? 
Since no data will ever arrive from the sender as the sender already sent the 
EndDataStream RPC.


Line 321:     // Wait for 10s
Add a brief comment stating that this is to check if the DataStreamMgr is being 
shutdown.


http://gerrit.cloudera.org:8080/#/c/7103/1/be/src/runtime/data-stream-mgr.h
File be/src/runtime/data-stream-mgr.h:

PS1, Line 81: will do one of three things
nit: would be nice to format them as bullet points.


PS1, Line 83: if the buffer is full
"if the batch queues are full"?
i.e. be more specific about what "buffer" it is.


PS1, Line 87: the sender
"the sender along with its payload" ?


PS1, Line 124: to
from?


Line 224:   /// has not yet prepared 'payload' is queued until it arrives, or 
is timed out. If the
nit: been prepared,


http://gerrit.cloudera.org:8080/#/c/7103/1/be/src/service/impala-server.h
File be/src/service/impala-server.h:

PS1, Line 255: void UpdateFilter
Leave a TODO stating that this should move to query-state.h/cc after 
IMPALA-3825 (if you agree).


-- 
To view, visit http://gerrit.cloudera.org:8080/7103
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b
Gerrit-PatchSet: 1
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson <[email protected]>
Gerrit-Reviewer: Henry Robinson <[email protected]>
Gerrit-Reviewer: Sailesh Mukil <[email protected]>
Gerrit-HasComments: Yes

Reply via email to