[ 
https://issues.apache.org/jira/browse/MAPREDUCE-5611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13847999#comment-13847999
 ] 

Sandy Ryza commented on MAPREDUCE-5611:
---------------------------------------

If I understand correctly, the issue is that, even when the blocks referred to 
by a split refer reside on multiple nodes, CombineFileInputFormat only creates 
the split with its locations referring to a single node.  The proposed fix is 
to create the split with all nodes that are replicas for any of the blocks 
included in the split.  This would allow the scheduler to prefer placing the 
tasks on those nodes.

However, the proposed change could cause performance regressions in situations 
when we are combining many small files.  Imagine a 1000-node cluster and and we 
have created a split composed of 1000 small files that all have a replica on a 
single node.  The other replicas for these files are likely spread out on nodes 
all over the cluster.  If we go with the proposed approach then we would end up 
requesting every node on the cluster, even though we are really only likely to 
get a data-local performance speedup if the task gets placed on the node where 
all the files are together.

A fix that would not have this performance implication would be to create a 
split with all the nodes that are in the *intersection* of nodes that blocks in 
the split reside on.  So if a split contains a two blocks, one that resides on 
node1, node2, and node3, and another that resides on node2, node3, and node4, 
we would set the split's locations to node2 and node3.

If we choose to go with the second route, it would be good to do some quick 
back-of-the-envelope math to support that the time spent computing these 
intersections is worth the data-local speedup we could get.


> CombineFileInputFormat creates more rack-local tasks due to less split 
> location info.
> -------------------------------------------------------------------------------------
>
>                 Key: MAPREDUCE-5611
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-5611
>             Project: Hadoop Map/Reduce
>          Issue Type: Bug
>    Affects Versions: 1.2.1
>            Reporter: Chandra Prakash Bhagtani
>            Assignee: Chandra Prakash Bhagtani
>         Attachments: CombineFileInputFormat-trunk.patch
>
>
> I have come across an issue with CombineFileInputFormat. Actually I ran a 
> hive query on approx 1.2 GB data with CombineHiveInputFormat which internally 
> uses CombineFileInputFormat. My cluster size is 9 datanodes and 
> max.split.size is 256 MB
> When I ran this query with replication factor 9, hive consistently creates 
> all 6 rack-local tasks and with replication factor 3 it creates 5 rack-local 
> and 1 data local tasks. 
>  When replication factor is 9 (equal to cluster size), all the tasks should 
> be data-local as each datanode contains all the replicas of the input data, 
> but that is not happening i.e all the tasks are rack-local. 
> When I dug into CombineFileInputFormat.java code in getMoreSplits method, I 
> found the issue with the following snippet (specially in case of higher 
> replication factor)
> {code:title=CombineFileInputFormat.java|borderStyle=solid}
> for (Iterator<Map.Entry<String,
>          List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator();
>          iter.hasNext();) {
>        Map.Entry<String, List<OneBlockInfo>> one = iter.next();
>       nodes.add(one.getKey());
>       List<OneBlockInfo> blocksInNode = one.getValue();
>       // for each block, copy it into validBlocks. Delete it from
>       // blockToNodes so that the same block does not appear in
>       // two different splits.
>       for (OneBlockInfo oneblock : blocksInNode) {
>         if (blockToNodes.containsKey(oneblock)) {
>           validBlocks.add(oneblock);
>           blockToNodes.remove(oneblock);
>           curSplitSize += oneblock.length;
>           // if the accumulated split size exceeds the maximum, then
>           // create this split.
>           if (maxSize != 0 && curSplitSize >= maxSize) {
>             // create an input split and add it to the splits array
>             addCreatedSplit(splits, nodes, validBlocks);
>             curSplitSize = 0;
>             validBlocks.clear();
>           }
>         }
>       }
> {code}
> First node in the map nodeToBlocks has all the replicas of input file, so the 
> above code creates 6 splits all with only one location. Now if JT doesn't 
> schedule these tasks on that node, all the tasks will be rack-local, even 
> though all the other datanodes have all the other replicas.



--
This message was sent by Atlassian JIRA
(v6.1.4#6159)

Reply via email to