Hi,

I was following this thread. I tried adding the patch of the jira manually (
https://issues.apache.org/jira/browse/TEZ-3287 ) [referenced in the above
reply for auto reducer optimization in shuffle hash join case]. I added it
to 0.8.3 while the patch was for the master.
But, I got a comment from the author that, the patch wouldn't affect --
hive.tez.auto.reducer.parallelism=true.
 Am I missing something?


Thanks,
Lalitha

On Thu, Jun 30, 2016 at 9:11 AM, Gopal Vijayaraghavan <gop...@apache.org>
wrote:

>
> > 1. In the query plan, it still says Map Join Operator (Would have
> >expected it to be named as Reduce side operator).
>
> The "Map" in that case refers really to Map<K,V> rather the hadoop
> version. An unambigous name is if it were called the HashJoinOperator.
>
> This is one of the optimizations of Tez right now that a map-join can be
> inserted in any vertex, because "Map 1" is just really in the name (it is
> a vertex).
>
> Also, even if the input format was Text/Sequencefile, the reduce
> vectorization can vectorize the simple join cases because it is not tied
> to the inputformat anymore.
>
> > 2. The edges in this query plans were named as custom_simple_edge: Is
> >this the one pointing to the fact that sorting of mapper inputs are
> >bypassed?
>
> Not directly related, but the custom edges do their own edgemanager - the
> edgemanager that is there can possibly be replaced with a simple edge +
> unsorted input-output pairs since tez-0.5.x.
>
> But the edge has an extension which can do some non-simple things too,
> which is why Tez supports edge overrides like this.
>
>  <http://www.slideshare.net/Hadoop_Summit/w-235phall1pandey/13>
>
>
> > 3. "hive.tez.auto.reducer.parallelism=true" is not taking effect for
> >shuffle hash join.
>
> That issue was already reported by Twitter, the unsorted edges do not send
> out the output size bitsets.
>
>  <https://issues.apache.org/jira/browse/TEZ-3287>
>
>
> > 1. What does tez.auto.reducer.parallelism do -- Does it only reduce the
> >number of reducers based on the actual size of mapper output, or does it
> >do more.
>
> It does a bit more when PipelineSorter is enabled.
>
> The sorted edges actually partition-first and sort-then. So the sort-key
> is actually (reducer-n, key) & the first few bytes of that information is
> stored into metadata region of the sorter for better L1 cache hit-rate
> when sorting.
>
> So the more reducers there are, the faster it sorts. However, it
> compresses each reducer output independently, so slicing too thin produces
> bad network overheads.
>
> Auto-reducer parallelism exists so that you don't need to tune each query
> by hand to fit those trade-offs.
>
> > 2. I did not understand the intuition behind setting
> >hive.mapjoin.hybridgrace.hashtable=false (as mentioned in your previous
> >reply).
>
> Yes, it is the same impl from the wiki. But the grace hashjoin drops the
> hashtable if it spills between executions of the same vertex.
>
> The regular hashJoin does not reload the hashtable when the split changes,
> this means the grace hashjoin can take 4-5x more time than the optimized
> one.
>
> The time it takes to load the hashtable goes up, while the lookups aren't
> much different because the grace hash-join has a bloom filter on top of it.
>
> If you have 35,000 splits and 800 containers, the hash-build times adds up
> pretty quickly.
>
> > 3. In general, map join  in cluster mode, are these the actual steps
> >followed in hive/tez:
> > a. Hash table generation:  Partitioned hash tables of the small table is
> >created across multiple containers. In each container, a part of the
> >small table is dealt with. And in each container, the hash table is built
>
> No, broadcast tasks generate merely produces an unordered output - it is
> not a hashtable.
>
> This is done in parallel as you describe across multiple containers & on
> the cluster (tries for locality next to the small tables).
>
> > b. Broadcast of hash table: All the partitions of all the parts of mall
> >table, including the ones spilled in the disk are serialized and sent to
> >all the second map containers.
>
> The broadcast is done via shuffle, same as sorted data movement, but one
> which reads the unordered streams and builds a hashtable inside every
> JoinOperator.
>
> The hashtable is then put into a cache in the task which has scope of the
> Vertex - if the same vertex re-runs on the same container, it will reload
> from the cache instead of the shuffle stream.
>
> The grace hashtable throws away in-mem data when it reloads a spilled
> fraction of the hashtable, so the moment it has spilled it is no longer
> considered for reuse.
>
> > Where does the rebuilding of spilt hash table happen? Is it during
> >second map phase where join is happening with bigger table?
>
> The split-hashtable looks exactly like the regular hashtable, but it has 3
> return values for the data - Yes, No, Ask-Later.
>
> So other than the handling of the Ask-Later scenario, the split-hashtable
> looks exactly like the full in-mem one.
>
> > c. Join operator:  The big table is scanned in each second mapper,
> >against the entire hash table of small table, and result is got.
>
> Yes.
>
> Hadoop Summit slides from 2014, in the slides above are a little out of
> date, but they cover some of the basics related to how this all fits
> together.
>
> Cheers,
> Gopal
>
>
>

Reply via email to