[
https://issues.apache.org/jira/browse/MAPREDUCE-5611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13861418#comment-13861418
]
Chandra Prakash Bhagtani commented on MAPREDUCE-5611:
-----------------------------------------------------
Agreed, common nodes (intersection) should be placed in the beginning of the
locations set, but I feel having all the locations of all the blocks in
locations set will help scheduling and data locality. Two concerns
1. In your example, one node having all the 1000 replicas. What if, JT is not
able to schedule tasks on this node (slot limitation etc). Then it will pick
any random node and schedule the task (having all the blocks non local).
2. What if there is no intersection i.e common nodes for blocks in a split?
> CombineFileInputFormat only requests a single location per split when more
> could be optimal
> -------------------------------------------------------------------------------------------
>
> Key: MAPREDUCE-5611
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-5611
> Project: Hadoop Map/Reduce
> Issue Type: Bug
> Affects Versions: 1.2.1
> Reporter: Chandra Prakash Bhagtani
> Assignee: Chandra Prakash Bhagtani
> Attachments: CombineFileInputFormat-trunk.patch
>
>
> I have come across an issue with CombineFileInputFormat. Actually I ran a
> hive query on approx 1.2 GB data with CombineHiveInputFormat which internally
> uses CombineFileInputFormat. My cluster size is 9 datanodes and
> max.split.size is 256 MB
> When I ran this query with replication factor 9, hive consistently creates
> all 6 rack-local tasks and with replication factor 3 it creates 5 rack-local
> and 1 data local tasks.
> When replication factor is 9 (equal to cluster size), all the tasks should
> be data-local as each datanode contains all the replicas of the input data,
> but that is not happening i.e all the tasks are rack-local.
> When I dug into CombineFileInputFormat.java code in getMoreSplits method, I
> found the issue with the following snippet (specially in case of higher
> replication factor)
> {code:title=CombineFileInputFormat.java|borderStyle=solid}
> for (Iterator<Map.Entry<String,
> List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator();
> iter.hasNext();) {
> Map.Entry<String, List<OneBlockInfo>> one = iter.next();
> nodes.add(one.getKey());
> List<OneBlockInfo> blocksInNode = one.getValue();
> // for each block, copy it into validBlocks. Delete it from
> // blockToNodes so that the same block does not appear in
> // two different splits.
> for (OneBlockInfo oneblock : blocksInNode) {
> if (blockToNodes.containsKey(oneblock)) {
> validBlocks.add(oneblock);
> blockToNodes.remove(oneblock);
> curSplitSize += oneblock.length;
> // if the accumulated split size exceeds the maximum, then
> // create this split.
> if (maxSize != 0 && curSplitSize >= maxSize) {
> // create an input split and add it to the splits array
> addCreatedSplit(splits, nodes, validBlocks);
> curSplitSize = 0;
> validBlocks.clear();
> }
> }
> }
> {code}
> First node in the map nodeToBlocks has all the replicas of input file, so the
> above code creates 6 splits all with only one location. Now if JT doesn't
> schedule these tasks on that node, all the tasks will be rack-local, even
> though all the other datanodes have all the other replicas.
--
This message was sent by Atlassian JIRA
(v6.1.5#6160)