Sailesh Mukil has posted comments on this change. Change subject: IMPALA-4856: Port data stream service to KRPC ......................................................................
Patch Set 1: (12 comments) > This patch passes core, exhaustive and ASAN tests. It can execute > 32 concurrent streams of TPCDS-Q17 @ scale factor 30000 on a > 138-node cluster with Kerberos enabled. (I don't believe the > previous implementation could do this effectively because of the > number of Thrift connections required). > > Some perf results from a 20-node cluster: > > +------------+----------+-----------------------+--------+-------------+------------+-----------+----------------+-------------+-------+ > | Workload | Query | File Format | Avg(s) | Base > Avg(s) | Delta(Avg) | StdDev(%) | Base StdDev(%) | Num Clients | > Iters | > +------------+----------+-----------------------+--------+-------------+------------+-----------+----------------+-------------+-------+ > | TPCH(_300) | TPCH-Q3 | parquet / none / none | 32.55 | 28.18 > | +15.51% | 4.71% | 1.17% | 1 | 3 > | > | TPCH(_300) | TPCH-Q13 | parquet / none / none | 24.43 | 22.21 > | +9.99% | 0.61% | 0.70% | 1 | 3 > | > | TPCH(_300) | TPCH-Q8 | parquet / none / none | 7.53 | 7.05 > | +6.69% | 1.70% | 2.09% | 1 | 3 > | > | TPCH(_300) | TPCH-Q22 | parquet / none / none | 6.35 | 6.04 > | +5.19% | 0.37% | 0.76% | 1 | 3 > | > | TPCH(_300) | TPCH-Q14 | parquet / none / none | 4.28 | 4.10 > | +4.36% | 0.03% | 0.73% | 1 | 3 > | > | TPCH(_300) | TPCH-Q15 | parquet / none / none | 3.53 | 3.41 > | +3.69% | 0.61% | 1.42% | 1 | 3 > | > | TPCH(_300) | TPCH-Q16 | parquet / none / none | 6.09 | 5.87 > | +3.63% | 0.15% | 1.78% | 1 | 3 > | > | TPCH(_300) | TPCH-Q11 | parquet / none / none | 1.73 | 1.70 > | +2.22% | 0.10% | 0.95% | 1 | 3 > | > | TPCH(_300) | TPCH-Q21 | parquet / none / none | 105.84 | 103.71 > | +2.06% | 0.57% | 0.44% | 1 | 3 > | > | TPCH(_300) | TPCH-Q9 | parquet / none / none | 30.76 | 30.46 > | +1.00% | 2.57% | 1.22% | 1 | 3 > | > | TPCH(_300) | TPCH-Q1 | parquet / none / none | 22.14 | 21.94 > | +0.91% | 0.81% | 0.86% | 1 | 3 > | > | TPCH(_300) | TPCH-Q4 | parquet / none / none | 5.09 | 5.05 > | +0.79% | 0.48% | 2.54% | 1 | 3 > | > | TPCH(_300) | TPCH-Q18 | parquet / none / none | 31.76 | 32.54 > | -2.39% | 0.44% | 0.03% | 1 | 3 > | > | TPCH(_300) | TPCH-Q2 | parquet / none / none | 1.98 | 2.04 > | -2.74% | 7.17% | 7.41% | 1 | 3 > | > | TPCH(_300) | TPCH-Q5 | parquet / none / none | 47.62 | 48.98 > | -2.79% | 0.51% | 0.16% | 1 | 3 > | > | TPCH(_300) | TPCH-Q20 | parquet / none / none | 3.18 | 3.27 > | -2.89% | 1.34% | 1.98% | 1 | 3 > | > | TPCH(_300) | TPCH-Q6 | parquet / none / none | 1.32 | 1.37 > | -3.72% | 0.03% | 4.00% | 1 | 3 > | > | TPCH(_300) | TPCH-Q10 | parquet / none / none | 9.00 | 9.48 > | -5.06% | 0.16% | 0.69% | 1 | 3 > | > | TPCH(_300) | TPCH-Q17 | parquet / none / none | 5.16 | 5.75 > | -10.18% | 6.44% | 2.63% | 1 | 3 > | > | TPCH(_300) | TPCH-Q12 | parquet / none / none | 3.01 | 3.39 > | -11.38% | 2.43% | 0.06% | 1 | 3 > | > | TPCH(_300) | TPCH-Q19 | parquet / none / none | 25.20 | 28.82 > | I -12.57% | 0.01% | 0.75% | 1 | 3 > | > | TPCH(_300) | TPCH-Q7 | parquet / none / none | 45.32 | 61.16 > | I -25.91% | 0.55% | 2.22% | 1 | 3 > | > +------------+----------+-----------------------+--------+-------------+------------+-----------+----------------+-------------+-------+ > > Primitives (note the significant regression in many_independent_fragments, > that needs further attention) > > +---------------------+--------------------------------------------------------+-----------------------+--------+-------------+------------+-----------+----------------+-------------+-------+ > | Workload | Query > | File Format | Avg(s) | Base Avg(s) | > Delta(Avg) | StdDev(%) | Base StdDev(%) | Num Clients | Iters | > +---------------------+--------------------------------------------------------+-----------------------+--------+-------------+------------+-----------+----------------+-------------+-------+ > | TARGETED-PERF(_300) | primitive_many_independent_fragments > | parquet / none / none | 377.69 | 189.40 | R > +99.42% | 0.32% | 0.22% | 1 | 3 | > | TARGETED-PERF(_300) | primitive_filter_bigint_in_list > | parquet / none / none | 0.95 | 0.89 | > +6.16% | 0.01% | 0.44% | 1 | 3 | > | TARGETED-PERF(_300) | primitive_many_fragments > | parquet / none / none | 60.87 | 57.83 | > +5.25% | 1.00% | 0.54% | 1 | 3 | > | TARGETED-PERF(_300) | primitive_groupby_bigint_highndv > | parquet / none / none | 25.53 | 24.69 | > +3.40% | 1.30% | 0.92% | 1 | 3 | > | TARGETED-PERF(_300) | primitive_top-n_all > | parquet / none / none | 40.68 | 39.36 | > +3.35% | 0.75% | 1.49% | 1 | 3 | > | TARGETED-PERF(_300) | primitive_exchange_broadcast > | parquet / none / none | 84.74 | 82.23 | > +3.05% | 5.18% | 0.13% | 1 | 3 | > | TARGETED-PERF(_300) | primitive_intrinsic_appx_median > | parquet / none / none | 35.04 | 34.21 | > +2.42% | 1.01% | 0.96% | 1 | 3 | > | TARGETED-PERF(_300) | primitive_filter_string_like > | parquet / none / none | 5.48 | 5.38 | > +1.74% | 0.94% | 0.93% | 1 | 3 | > | TARGETED-PERF(_300) | primitive_groupby_bigint_pk > | parquet / none / none | 95.74 | 94.35 | > +1.47% | 0.82% | 2.74% | 1 | 3 | > | TARGETED-PERF(_300) | primitive_decimal_arithmetic > | parquet / none / none | 100.51 | 99.40 | > +1.12% | 0.72% | 1.62% | 1 | 3 | > | TARGETED-PERF(_300) | primitive_conjunct_ordering_1 > | parquet / none / none | 4.73 | 4.68 | > +1.04% | 0.04% | 1.12% | 1 | 3 | > | TARGETED-PERF(_300) | primitive_groupby_bigint_lowndv > | parquet / none / none | 3.34 | 3.32 | > +0.48% | 2.30% | 0.00% | 1 | 3 | > | TARGETED-PERF(_300) | primitive_orderby_bigint_expression > | parquet / none / none | 18.49 | 18.42 | > +0.41% | 2.91% | 4.33% | 1 | 3 | > | TARGETED-PERF(_300) | primitive_filter_decimal_selective > | parquet / none / none | 0.60 | 0.60 | > +0.22% | 0.27% | 0.35% | 1 | 3 | > | TARGETED-PERF(_300) | > primitive_shuffle_join_one_to_many_string_with_groupby > | parquet / none / none | 240.84 | 241.03 | -0.08% | > 0.46% | 0.44% | 1 | 3 | > | TARGETED-PERF(_300) | primitive_conjunct_ordering_3 > | parquet / none / none | 1.02 | 1.02 | > -0.35% | 0.07% | 0.34% | 1 | 3 | > | TARGETED-PERF(_300) | primitive_orderby_all > | parquet / none / none | 54.61 | 54.84 | > -0.42% | 1.48% | 1.04% | 1 | 3 | > | TARGETED-PERF(_300) | primitive_empty_build_join_1 > | parquet / none / none | 6.61 | 6.65 | > -0.72% | 0.02% | 0.78% | 1 | 3 | > | TARGETED-PERF(_300) | primitive_conjunct_ordering_4 > | parquet / none / none | 0.86 | 0.88 | > -1.76% | 0.28% | 3.07% | 1 | 3 | > | TARGETED-PERF(_300) | primitive_groupby_decimal_lowndv.test > | parquet / none / none | 3.39 | 3.47 | > -2.24% | 2.29% | 1.41% | 1 | 3 | > | TARGETED-PERF(_300) | primitive_intrinsic_to_date > | parquet / none / none | 79.05 | 81.70 | > -3.24% | 0.42% | 0.36% | 1 | 3 | > | TARGETED-PERF(_300) | primitive_filter_string_selective > | parquet / none / none | 0.53 | 0.55 | > -4.82% | 4.22% | 0.39% | 1 | 3 | > | TARGETED-PERF(_300) | primitive_broadcast_join_2 > | parquet / none / none | 4.07 | 4.36 | > -6.68% | 0.66% | 0.45% | 1 | 3 | > | TARGETED-PERF(_300) | primitive_broadcast_join_3 > | parquet / none / none | 49.97 | 53.78 | > -7.09% | 0.45% | 0.45% | 1 | 3 | > | TARGETED-PERF(_300) | primitive_topn_bigint > | parquet / none / none | 5.13 | 5.52 | > -7.12% | 0.68% | 0.06% | 1 | 3 | > | TARGETED-PERF(_300) | primitive_filter_bigint_selective > | parquet / none / none | 0.37 | 0.40 | > -7.23% | 7.01% | 0.28% | 1 | 3 | > | TARGETED-PERF(_300) | primitive_broadcast_join_1 > | parquet / none / none | 0.96 | 1.04 | > -7.25% | 0.08% | 7.60% | 1 | 3 | > | TARGETED-PERF(_300) | primitive_filter_string_non_selective > | parquet / none / none | 0.90 | 0.98 | > -7.67% | 5.27% | 2.36% | 1 | 3 | > | TARGETED-PERF(_300) | primitive_filter_decimal_non_selective > | parquet / none / none | 0.85 | 0.93 | > -8.51% | 0.06% | 2.42% | 1 | 3 | > | TARGETED-PERF(_300) | primitive_orderby_bigint > | parquet / none / none | 14.56 | 16.13 | I > -9.72% | 0.44% | 0.12% | 1 | 3 | > | TARGETED-PERF(_300) | primitive_conjunct_ordering_2 > | parquet / none / none | 24.32 | 27.75 | I > -12.36% | 0.28% | 0.63% | 1 | 3 | > | TARGETED-PERF(_300) | primitive_groupby_decimal_highndv > | parquet / none / none | 21.17 | 24.49 | > -13.58% | 0.35% | 1.96% | 1 | 3 | > | TARGETED-PERF(_300) | primitive_exchange_shuffle > | parquet / none / none | 69.10 | 80.03 | > -13.66% | 0.10% | 1.22% | 1 | 3 | > | TARGETED-PERF(_300) | primitive_shuffle_join_union_all_with_groupby > | parquet / none / none | 56.55 | 67.01 | I -15.61% > | 1.06% | 0.23% | 1 | 3 | > | TARGETED-PERF(_300) | primitive_conjunct_ordering_5 > | parquet / none / none | 28.41 | 35.20 | > -19.28% | 5.12% | 6.49% | 1 | 3 | > | TARGETED-PERF(_300) | primitive_filter_bigint_non_selective > | parquet / none / none | 1.08 | 1.65 | I > -34.87% | 7.17% | 3.12% | 1 | 3 | > +---------------------+--------------------------------------------------------+-----------------------+--------+-------------+------------+-----------+----------------+-------------+-------+ Did a first pass. Just clarifying, but you mentioned that Kerberos isn't supported in the commit message, but your benchmark was with kerberos? Did you run it with a security patch to enable kerberos? http://gerrit.cloudera.org:8080/#/c/7103/1/be/src/runtime/data-stream-mgr.cc File be/src/runtime/data-stream-mgr.cc: Line 61: bool unused = false; Doesn't the contract for FindRecvr() state that we need to hold 'lock_' before we call it? Line 105: EarlySendersList waiters; Add brief comment: "Process payloads of early senders for this receiver now that it's created" or something similar. PS1, Line 123: for (int32_t sender_id: waiters.closing_senders) recvr->RemoveSender(sender_id); According to the header comment in data-stream-mgr.h, a sender shouldn't be in 'closing_senders' if that same sender has a payload in waiting in 'early_senders', ensuring that we process all payloads through AddBatch() before calling CloseSender(). This is unless the sender got closed or cancelled and doesn't care about the previous TransmitData() payloads, presumably because the query failed or got cancelled. Given that, wouldn't it be better to call RemoveSender() first before calling EnqueueRowBatch()? We don't want to process data if the query is being cancelled anyway. PS1, Line 290: senders Assign an appropriate name like "timed_out_senders" or something similar. PS1, Line 300: early_senders_ Assume the following case: - This contained closed_senders that are erased from here since the receiver never got created in time. - Next, the ExchangeNode finally comes around and creates the receiver, and calls GetBatch(). It would hang indefinitely in GetBatch() at data_arrival_cv_.wait() right? Since no data will ever arrive from the sender as the sender already sent the EndDataStream RPC. Line 321: // Wait for 10s Add a brief comment stating that this is to check if the DataStreamMgr is being shutdown. http://gerrit.cloudera.org:8080/#/c/7103/1/be/src/runtime/data-stream-mgr.h File be/src/runtime/data-stream-mgr.h: PS1, Line 81: will do one of three things nit: would be nice to format them as bullet points. PS1, Line 83: if the buffer is full "if the batch queues are full"? i.e. be more specific about what "buffer" it is. PS1, Line 87: the sender "the sender along with its payload" ? PS1, Line 124: to from? Line 224: /// has not yet prepared 'payload' is queued until it arrives, or is timed out. If the nit: been prepared, http://gerrit.cloudera.org:8080/#/c/7103/1/be/src/service/impala-server.h File be/src/service/impala-server.h: PS1, Line 255: void UpdateFilter Leave a TODO stating that this should move to query-state.h/cc after IMPALA-3825 (if you agree). -- To view, visit http://gerrit.cloudera.org:8080/7103 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b Gerrit-PatchSet: 1 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Henry Robinson <[email protected]> Gerrit-Reviewer: Henry Robinson <[email protected]> Gerrit-Reviewer: Sailesh Mukil <[email protected]> Gerrit-HasComments: Yes
