Hi, I was following this thread. I tried adding the patch of the jira manually ( https://issues.apache.org/jira/browse/TEZ-3287 ) [referenced in the above reply for auto reducer optimization in shuffle hash join case]. I added it to 0.8.3 while the patch was for the master. But, I got a comment from the author that, the patch wouldn't affect -- hive.tez.auto.reducer.parallelism=true. Am I missing something?
Thanks, Lalitha On Thu, Jun 30, 2016 at 9:11 AM, Gopal Vijayaraghavan <gop...@apache.org> wrote: > > > 1. In the query plan, it still says Map Join Operator (Would have > >expected it to be named as Reduce side operator). > > The "Map" in that case refers really to Map<K,V> rather the hadoop > version. An unambigous name is if it were called the HashJoinOperator. > > This is one of the optimizations of Tez right now that a map-join can be > inserted in any vertex, because "Map 1" is just really in the name (it is > a vertex). > > Also, even if the input format was Text/Sequencefile, the reduce > vectorization can vectorize the simple join cases because it is not tied > to the inputformat anymore. > > > 2. The edges in this query plans were named as custom_simple_edge: Is > >this the one pointing to the fact that sorting of mapper inputs are > >bypassed? > > Not directly related, but the custom edges do their own edgemanager - the > edgemanager that is there can possibly be replaced with a simple edge + > unsorted input-output pairs since tez-0.5.x. > > But the edge has an extension which can do some non-simple things too, > which is why Tez supports edge overrides like this. > > <http://www.slideshare.net/Hadoop_Summit/w-235phall1pandey/13> > > > > 3. "hive.tez.auto.reducer.parallelism=true" is not taking effect for > >shuffle hash join. > > That issue was already reported by Twitter, the unsorted edges do not send > out the output size bitsets. > > <https://issues.apache.org/jira/browse/TEZ-3287> > > > > 1. What does tez.auto.reducer.parallelism do -- Does it only reduce the > >number of reducers based on the actual size of mapper output, or does it > >do more. > > It does a bit more when PipelineSorter is enabled. > > The sorted edges actually partition-first and sort-then. So the sort-key > is actually (reducer-n, key) & the first few bytes of that information is > stored into metadata region of the sorter for better L1 cache hit-rate > when sorting. > > So the more reducers there are, the faster it sorts. However, it > compresses each reducer output independently, so slicing too thin produces > bad network overheads. > > Auto-reducer parallelism exists so that you don't need to tune each query > by hand to fit those trade-offs. > > > 2. I did not understand the intuition behind setting > >hive.mapjoin.hybridgrace.hashtable=false (as mentioned in your previous > >reply). > > Yes, it is the same impl from the wiki. But the grace hashjoin drops the > hashtable if it spills between executions of the same vertex. > > The regular hashJoin does not reload the hashtable when the split changes, > this means the grace hashjoin can take 4-5x more time than the optimized > one. > > The time it takes to load the hashtable goes up, while the lookups aren't > much different because the grace hash-join has a bloom filter on top of it. > > If you have 35,000 splits and 800 containers, the hash-build times adds up > pretty quickly. > > > 3. In general, map join in cluster mode, are these the actual steps > >followed in hive/tez: > > a. Hash table generation: Partitioned hash tables of the small table is > >created across multiple containers. In each container, a part of the > >small table is dealt with. And in each container, the hash table is built > > No, broadcast tasks generate merely produces an unordered output - it is > not a hashtable. > > This is done in parallel as you describe across multiple containers & on > the cluster (tries for locality next to the small tables). > > > b. Broadcast of hash table: All the partitions of all the parts of mall > >table, including the ones spilled in the disk are serialized and sent to > >all the second map containers. > > The broadcast is done via shuffle, same as sorted data movement, but one > which reads the unordered streams and builds a hashtable inside every > JoinOperator. > > The hashtable is then put into a cache in the task which has scope of the > Vertex - if the same vertex re-runs on the same container, it will reload > from the cache instead of the shuffle stream. > > The grace hashtable throws away in-mem data when it reloads a spilled > fraction of the hashtable, so the moment it has spilled it is no longer > considered for reuse. > > > Where does the rebuilding of spilt hash table happen? Is it during > >second map phase where join is happening with bigger table? > > The split-hashtable looks exactly like the regular hashtable, but it has 3 > return values for the data - Yes, No, Ask-Later. > > So other than the handling of the Ask-Later scenario, the split-hashtable > looks exactly like the full in-mem one. > > > c. Join operator: The big table is scanned in each second mapper, > >against the entire hash table of small table, and result is got. > > Yes. > > Hadoop Summit slides from 2014, in the slides above are a little out of > date, but they cover some of the basics related to how this all fits > together. > > Cheers, > Gopal > > >