Great! We’re on 2.11 now. I’ll do some before/after benchmarks this week.

From: Todd Lipcon <[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Date: Monday, July 23, 2018 at 10:05 AM
To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Subject: Re: "broadcast" tablet replication for kudu?

Impala 2.12. The external RPC protocol is still Thrift.

Todd

On Mon, Jul 23, 2018, 7:02 AM Clifford Resnick 
<[email protected]<mailto:[email protected]>> wrote:
Is this impala 3.0? I’m concerned about breaking changes and our RPC to Impala 
is thrift-based.

From: Todd Lipcon <[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Date: Monday, July 23, 2018 at 9:46 AM
To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Subject: Re: "broadcast" tablet replication for kudu?

Are you on the latest release of Impala? It switched from using Thrift for RPC 
to a new implementation (actually borrowed from kudu) which might help 
broadcast performance a bit.

Todd

On Mon, Jul 23, 2018, 6:43 AM Boris Tyukin 
<[email protected]<mailto:[email protected]>> wrote:
sorry to revive the old thread but I am curious if there is a good way to speed 
up requests to frequently used tables in Kudu.

On Thu, Apr 12, 2018 at 8:19 AM Boris Tyukin 
<[email protected]<mailto:[email protected]>> wrote:
bummer..After reading your guys conversation, I wish there was an easier 
way...we will have the same issue as we have a few dozens of tables which are 
used very frequently in joins and I was hoping there was an easy way to 
replicate them on most of the nodes to avoid broadcasts every time

On Thu, Apr 12, 2018 at 7:26 AM, Clifford Resnick 
<[email protected]<mailto:[email protected]>> wrote:
The table in our case is 12x hashed and ranged by month, so the broadcasts were 
often to all (12) nodes.

On Apr 12, 2018 12:58 AM, Mauricio Aristizabal 
<[email protected]<mailto:[email protected]>> wrote:
Sorry I left that out Cliff, FWIW it does seem to have been broadcast..

[http://?ui=2&ik=2b5b303e51&view=att&th=162b815b85ff3b8d&attid=0.2&disp=safe&realattid=ii_jfw0n6hg0_162b815b85ff3b8d&zw]

Not sure though how a shuffle would be much different from a broadcast if 
entire table is 1 file/block in 1 node.

On Wed, Apr 11, 2018 at 8:52 PM, Cliff Resnick 
<[email protected]<mailto:[email protected]>> wrote:
From the screenshot it does not look like there was a broadcast of the 
dimension table(s), so it could be the case here that the multiple smaller 
sends helps. Our dim tables are generally in the single-digit millions and 
Impala chooses to broadcast them. Since the fact result cardinality is always 
much smaller, we've found that forcing a [shuffle] dimension join is actually 
faster since it only sends dims once rather than all to all nodes. The 
degenerative performance of broadcast is especially obvious when the query 
returns zero results. I don't have much experience here, but it does seem that 
Kudu's efficient predicate scans can sometimes "break" Impala's query plan.

-Cliff

On Wed, Apr 11, 2018 at 5:41 PM, Mauricio Aristizabal 
<[email protected]<mailto:[email protected]>> wrote:
@Todd not to belabor the point, but when I suggested breaking up small dim 
tables into multiple parquet files (and in this thread's context perhaps 
partition kudu table, even if small, into multiple tablets), it was to speed up 
joins/exchanges, not to parallelize the scan.

For example recently we ran into this slow query where the 14M record dimension 
fit into a single file & block, so it got scanned on a single node though still 
pretty quickly (300ms), however it caused the join to take 25+ seconds and 
bogged down the entire query.  See highlighted fragment and its parent.

So we broke it into several small files the way I described in my previous 
post, and now join and query are fast (6s).

-m


[X]


On Fri, Mar 16, 2018 at 3:55 PM, Todd Lipcon 
<[email protected]<mailto:[email protected]>> wrote:
I suppose in the case that the dimension table scan makes a non-trivial portion 
of your workload time, then yea, parallelizing the scan as you suggest would be 
beneficial. That said, in typical analytic queries, scanning the dimension 
tables is very quick compared to scanning the much-larger fact tables, so the 
extra parallelism on the dim table scan isn't worth too much.

-Todd

On Fri, Mar 16, 2018 at 2:56 PM, Mauricio Aristizabal 
<[email protected]<mailto:[email protected]>> wrote:
@Todd I know working with parquet in the past I've seen small dimensions that 
fit in 1 single file/block limit parallelism of join/exchange/aggregation 
nodes, and I've forced those dims to spread across 20 or so blocks by 
leveraging SET PARQUET_FILE_SIZE=8m; or similar when doing INSERT OVERWRITE to 
load them, which then allows these operations to parallelize across that many 
nodes.

Wouldn't it be useful here for Cliff's small dims to be partitioned into a 
couple tablets to similarly improve parallelism?

-m

On Fri, Mar 16, 2018 at 2:29 PM, Todd Lipcon 
<[email protected]<mailto:[email protected]>> wrote:
On Fri, Mar 16, 2018 at 2:19 PM, Cliff Resnick 
<[email protected]<mailto:[email protected]>> wrote:
Hey Todd,

Thanks for that explanation, as well as all the great work you're doing  -- 
it's much appreciated! I just have one last follow-up question. Reading about 
BROADCAST operations (Kudu, Spark, Flink, etc. ) it seems the smaller table is 
always copied in its entirety BEFORE the predicate is evaluated.

That's not quite true. If you have a predicate on a joined column, or on one of 
the columns in the joined table, it will be pushed down to the "scan" operator, 
which happens before the "exchange". In addition, there is a feature called 
"runtime filters" that can push dynamically-generated filters from one side of 
the exchange to the other.

But since the Kudu client provides a serialized scanner as part of the 
ScanToken API, why wouldn't Impala use that instead if it knows that the table 
is Kudu and the query has any type of predicate? Perhaps if I hash-partition 
the table I could maybe force this (because that complicates a BROADCAST)? I 
guess this is really a question for Impala but perhaps there is a more basic 
reason.

Impala could definitely be smarter, just a matter of programming Kudu-specific 
join strategies into the optimizer. Today, the optimizer isn't aware of the 
unique properties of Kudu scans vs other storage mechanisms.

-Todd


-Cliff

On Fri, Mar 16, 2018 at 4:10 PM, Todd Lipcon 
<[email protected]<mailto:[email protected]>> wrote:
On Fri, Mar 16, 2018 at 12:30 PM, Clifford Resnick 
<[email protected]<mailto:[email protected]>> wrote:
I thought I had read that the Kudu client can configure a scan for 
CLOSEST_REPLICA and assumed this was a way to take advantage of data 
collocation.

Yea, when a client uses CLOSEST_REPLICA it will read a local one if available. 
However, that doesn't influence the higher level operation of the Impala (or 
Spark) planner. The planner isn't aware of the replication policy, so it will 
use one of the existing supported JOIN strategies. Given statistics, it will 
choose to broadcast the small table, which means that it will create a plan 
that looks like:


                                   +-------------------------+
                                   |                         |
                        +---------->build      JOIN          |
                        |          |                         |
                        |          |              probe      |
                 +--------------+  +-------------------------+
                 |              |                  |
                 | Exchange     |                  |
            +----+ (broadcast   |                  |
            |    |              |                  |
            |    +--------------+                  |
            |                                      |
      +---------+                                  |
      |         |                        +-----------------------+
      |  SCAN   |                        |                       |
      |  KUDU   |                        |   SCAN (other side)   |
      |         |                        |                       |
      +---------+                        +-----------------------+

(hopefully the ASCII art comes through)

In other words, the "scan kudu" operator scans the table once, and then 
replicates the results of that scan into the JOIN operator. The "scan kudu" 
operator of course will read its local copy, but it will still go through the 
exchange process.

For the use case you're talking about, where the join is just looking up a 
single row by PK in a dimension table, ideally we'd be using an altogether 
different join strategy such as nested-loop join, with the inner "loop" 
actually being a Kudu PK lookup, but that strategy isn't implemented by Impala.

-Todd


 If this exists then how far out of context is my understanding of it? Reading 
about HDFS cache replication, I do know that Impala will choose a random 
replica there to more evenly distribute load. But especially compared to Kudu 
upsert, managing mutable data using Parquet is painful. So, perhaps to sum 
thing up, if nearly 100% of my metadata scan are single Primary Key lookups 
followed by a tiny broadcast then am I really just splitting hairs 
performance-wise between Kudu and HDFS-cached parquet?

From:  Todd Lipcon <[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Date: Friday, March 16, 2018 at 2:51 PM

To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Subject: Re: "broadcast" tablet replication for kudu?

It's worth noting that, even if your table is replicated, Impala's planner is 
unaware of this fact and it will give the same plan regardless. That is to say, 
rather than every node scanning its local copy, instead a single node will 
perform the whole scan (assuming it's a small table) and broadcast it from 
there within the scope of a single query. So, I don't think you'll see any 
performance improvements on Impala queries by attempting something like an 
extremely high replication count.

I could see bumping the replication count to 5 for these tables since the extra 
storage cost is low and it will ensure higher availability of the important 
central tables, but I'd be surprised if there is any measurable perf impact.

-Todd

On Fri, Mar 16, 2018 at 11:35 AM, Clifford Resnick 
<[email protected]<mailto:[email protected]>> wrote:
Thanks for that, glad I was wrong there! Aside from replication considerations, 
is it also recommended the number of tablet servers be odd?

I will check forums as you suggested, but from what I read after searching is 
that Impala relies on user configured caching strategies using HDFS cache.  The 
workload for these tables is very light write, maybe a dozen or so records per 
hour across 6 or 7 tables. The size of the tables ranges from thousands to low 
millions of rows so so sub-partitioning would not be required. So perhaps this 
is not a typical use-case but I think it could work quite well with kudu.

From: Dan Burkert <[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Date: Friday, March 16, 2018 at 2:09 PM
To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Subject: Re: "broadcast" tablet replication for kudu?

The replication count is the number of tablet servers which Kudu will host 
copies on.  So if you set the replication level to 5, Kudu will put the data on 
5 separate tablet servers.  There's no built-in broadcast table feature; upping 
the replication factor is the closest thing.  A couple of things to keep in 
mind:

- Always use an odd replication count.  This is important due to how the Raft 
algorithm works.  Recent versions of Kudu won't even let you specify an even 
number without flipping some flags.
- We don't test much much beyond 5 replicas.  It should work, but you may run 
in to issues since it's a relatively rare configuration.  With a heavy write 
workload and many replicas you are even more likely to encounter issues.

It's also worth checking in an Impala forum whether it has features that make 
joins against small broadcast tables better?  Perhaps Impala can cache small 
tables locally when doing joins.

- Dan

On Fri, Mar 16, 2018 at 10:55 AM, Clifford Resnick 
<[email protected]<mailto:[email protected]>> wrote:
The problem is, AFIK, that replication count is not necessarily the 
distribution count, so you can't guarantee all tablet servers will have a copy.

On Mar 16, 2018 1:41 PM, Boris Tyukin 
<[email protected]<mailto:[email protected]>> wrote:
I'm new to Kudu but we are also going to use Impala mostly with Kudu. We have a 
few tables that are small but used a lot. My plan is replicate them more than 3 
times. When you create a kudu table, you can specify number of replicated 
copies (3 by default) and I guess you can put there a number, corresponding to 
your node count in cluster. The downside, you cannot change that number unless 
you recreate a table.

On Fri, Mar 16, 2018 at 10:42 AM, Cliff Resnick 
<[email protected]<mailto:[email protected]>> wrote:
We will soon be moving our analytics from AWS Redshift to Impala/Kudu. One 
Redshift feature that we will miss is its ALL Distribution, where a copy of a 
table is maintained on each server. We define a number of metadata tables this 
way since they are used in nearly every query. We are considering using parquet 
in HDFS cache for these, and Kudu would be a much better fit for the update 
semantics but we are worried about the additional contention.  I'm wondering if 
having a Broadcast, or ALL, tablet replication might be an easy feature to add 
to Kudu?

-Cliff





--
Todd Lipcon
Software Engineer, Cloudera



--
Todd Lipcon
Software Engineer, Cloudera




--
Todd Lipcon
Software Engineer, Cloudera



--
MAURICIO ARISTIZABAL
Architect - Business Intelligence + Data Science
[email protected]<mailto:[email protected]>(m)+1 323 309 
4260<tel:(323)%20309-4260>
223 E. De La Guerra St. | Santa Barbara, CA 
93101<https://maps.google.com/?q=223+E.+De+La+Guerra+St.+%7C+Santa+Barbara,+CA+93101&entry=gmail&source=g>
[http://ux.impactradius.net/wp-content/uploads/2016/11/ir-logo.png]
Overview<http://www.impactradius.com/?src=slsap> | 
Twitter<https://twitter.com/impactradius> | 
Facebook<https://www.facebook.com/pages/Impact-Radius/153376411365183> | 
LinkedIn<https://www.linkedin.com/company/impact-radius-inc->



--
Todd Lipcon
Software Engineer, Cloudera



--
[https://storage.googleapis.com/signaturesatori/customer-C03aim2pl/images/companyLogo/736a34783bd0967fcfb23efa3490fed14fda68d9f9f27569e9eb67c3fbab98f.png]
Mauricio Aristizabal
Architect - Data Pipeline

M  323 309 4260
E  [email protected]<http://[email protected]>  |  W  https://impact.com
[http://storage.googleapis.com/signaturesatori/icons/linkedin.png]<https://www.linkedin.com/company/608678/>
  [http://storage.googleapis.com/signaturesatori/icons/facebook.png] 
<https://www.facebook.com/ImpactMarTech/>   
[http://storage.googleapis.com/signaturesatori/icons/twitter.png] 
<https://twitter.com/impactmartech>






--
[https://storage.googleapis.com/signaturesatori/customer-C03aim2pl/images/companyLogo/736a34783bd0967fcfb23efa3490fed14fda68d9f9f27569e9eb67c3fbab98f.png]
Mauricio Aristizabal
Architect - Data Pipeline

M  323 309 4260
E  [email protected]<http://[email protected]>  |  W  https://impact.com
[http://storage.googleapis.com/signaturesatori/icons/linkedin.png]<https://www.linkedin.com/company/608678/>
  [http://storage.googleapis.com/signaturesatori/icons/facebook.png] 
<https://www.facebook.com/ImpactMarTech/>   
[http://storage.googleapis.com/signaturesatori/icons/twitter.png] 
<https://twitter.com/impactmartech>



Reply via email to