[ 
https://issues.apache.org/jira/browse/MAPREDUCE-5611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13815716#comment-13815716
 ] 

Chandra Prakash Bhagtani commented on MAPREDUCE-5611:
-----------------------------------------------------

I am suggesting following small patch for this issue. Patched line is in *bold*

{code:title=CombineFileInputFormat.java|borderStyle=solid}
for (OneBlockInfo oneblock : blocksInNode) {
        if (blockToNodes.containsKey(oneblock)) {
          *nodes.addAll(Arrays.asList(blockToNodes.get(oneblock)));*
          validBlocks.add(oneblock);
          blockToNodes.remove(oneblock);
          curSplitSize += oneblock.length;

          // if the accumulated split size exceeds the maximum, then
          // create this split.
          if (maxSize != 0 && curSplitSize >= maxSize) {
            // create an input split and add it to the splits array
            addCreatedSplit(splits, nodes, validBlocks);
            curSplitSize = 0;
            validBlocks.clear();
          }
        }
      }
{code}

This will include all the nodes on which the replica lies in the location hash 
set, so that whichever node JT schedules the tasks it finds all the replicas 
local to that node (in case of replication = cluster size). This patch even 
helps in 3 replication factor, this fact is evident with the following numbers 
in my cluster

9 replicas without patch  (all 6 tasks are rack-local)
9 replicas with patch  (5 tasks are data-local and 1 rack-local)

3 replicas without patch  (1 data-local and 5 rack-local)
3 replicas with patch  (2 data-local and 4 rack-local)

Please let me know, if I can submit the above patch.

> CombineFileInputFormat creates more rack-local tasks due to less split 
> location info.
> -------------------------------------------------------------------------------------
>
>                 Key: MAPREDUCE-5611
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-5611
>             Project: Hadoop Map/Reduce
>          Issue Type: Bug
>    Affects Versions: 2.2.0
>            Reporter: Chandra Prakash Bhagtani
>            Assignee: Chandra Prakash Bhagtani
>
> I have come across an issue with CombineFileInputFormat. Actually I ran a 
> hive query on approx 1.2 GB data with CombineHiveInputFormat which internally 
> uses CombineFileInputFormat. My cluster size is 9 datanodes and 
> max.split.size is 256 MB
> When I ran this query with replication factor 9, hive consistently creates 
> all 6 rack-local tasks and with replication factor 3 it creates 5 rack-local 
> and 1 data local tasks. 
>  When replication factor is 9 (equal to cluster size), all the tasks should 
> be data-local as each datanode contains all the replicas of the input data, 
> but that is not happening i.e all the tasks are rack-local. 
> When I dug into CombineFileInputFormat.java code in getMoreSplits method, I 
> found the issue with the following snippet (specially in case of higher 
> replication factor)
> {code:title=CombineFileInputFormat.java|borderStyle=solid}
> for (Iterator<Map.Entry<String,
>          List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator();
>          iter.hasNext();) {
>        Map.Entry<String, List<OneBlockInfo>> one = iter.next();
>       nodes.add(one.getKey());
>       List<OneBlockInfo> blocksInNode = one.getValue();
>       // for each block, copy it into validBlocks. Delete it from
>       // blockToNodes so that the same block does not appear in
>       // two different splits.
>       for (OneBlockInfo oneblock : blocksInNode) {
>         if (blockToNodes.containsKey(oneblock)) {
>           validBlocks.add(oneblock);
>           blockToNodes.remove(oneblock);
>           curSplitSize += oneblock.length;
>           // if the accumulated split size exceeds the maximum, then
>           // create this split.
>           if (maxSize != 0 && curSplitSize >= maxSize) {
>             // create an input split and add it to the splits array
>             addCreatedSplit(splits, nodes, validBlocks);
>             curSplitSize = 0;
>             validBlocks.clear();
>           }
>         }
>       }
> {code}
> First node in the map nodeToBlocks has all the replicas of input file, so the 
> above code creates 6 splits all with only one location. Now if JT doesn't 
> schedule these tasks on that node, all the tasks will be rack-local, even 
> though all the other datanodes have all the other replicas.



--
This message was sent by Atlassian JIRA
(v6.1#6144)

Reply via email to