[jira] [Updated] (TEZ-2358) Pipelined Shuffle: MergeManager assumptions about 1 merge per source-task
[ https://issues.apache.org/jira/browse/TEZ-2358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajesh Balamohan updated TEZ-2358: -- Attachment: TEZ-2358.3.patch Added preconditions check in MergeManager.closeOnDiskFile(). Since we need to consider only filepath offset, we need to iterate through all items in onDiskMapOutputs (as fileChunk includes filepath, offset, length). It is still fine as it won't be expensive and makes it easier for debugging. [~gopalv] - Please have a look at the latest patch when you find time. Pipelined Shuffle: MergeManager assumptions about 1 merge per source-task - Key: TEZ-2358 URL: https://issues.apache.org/jira/browse/TEZ-2358 Project: Apache Tez Issue Type: Bug Affects Versions: 0.7.0 Reporter: Gopal V Assignee: Rajesh Balamohan Attachments: TEZ-2358.1.patch, TEZ-2358.2.patch, TEZ-2358.3.patch, syslog_attempt_1429683757595_0141_1_01_000143_0.syslog.bz2 The Tez MergeManager code assumes that the src-task-id is unique between merge operations, this results in some confusion when two merge sequences have to process output from the same src-task-id. {code} private TezRawKeyValueIterator finalMerge(Configuration job, FileSystem fs, ListMapOutput inMemoryMapOutputs, ListFileChunk onDiskMapOutputs ... if (inMemoryMapOutputs.size() 0) { int srcTaskId = inMemoryMapOutputs.get(0).getAttemptIdentifier().getInputIdentifier().getInputIndex(); ... // must spill to disk, but can't retain in-mem for intermediate merge final Path outputPath = mapOutputFile.getInputFileForWrite(srcTaskId, inMemToDiskBytes).suffix( Constants.MERGED_OUTPUT_PREFIX); ... {code} This or some scenario related to this, results in the following FileChunks list which contains identical named paths with different lengths. {code} 2015-04-23 03:28:50,983 INFO [MemtoDiskMerger [Map_1]] orderedgrouped.MergeManager: Initiating in-memory merge with 6 segments... 2015-04-23 03:28:50,987 INFO [MemtoDiskMerger [Map_1]] impl.TezMerger: Merging 6 sorted segments 2015-04-23 03:28:50,988 INFO [MemtoDiskMerger [Map_1]] impl.TezMerger: Down to the last merge-pass, with 6 segments left of total size: 1165944755 bytes 2015-04-23 03:28:58,495 INFO [MemtoDiskMerger [Map_1]] orderedgrouped.MergeManager: attempt_1429683757595_0141_1_01_000143_0_10027 Merge of the 6 files in-memory complete. Local file is /grid/5/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_404.out.merged of size 785583965 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] orderedgrouped.MergeManager: finalMerge called with 0 in-memory map-outputs and 5 on-disk map-outputs 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] orderedgrouped.MergeManager: GOPAL: onDiskBytes = 365232290 += 365232290for/grid/4/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_1023.out 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] orderedgrouped.MergeManager: GOPAL: onDiskBytes = 730529899 += 365297609for/grid/5/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_404.out 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] orderedgrouped.MergeManager: GOPAL: onDiskBytes = 1095828683 += 365298784for/grid/5/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_404.out {code} The multiple instances of 404.out indicates that we pulled two pipelined chunks of the same shuffle src id, once into memory and twice onto disk. {code} 2015-04-23 03:28:08,256 INFO [TezTaskEventRouter[attempt_1429683757595_0141_1_01_000143_0]] orderedgrouped.ShuffleInputEventHandlerOrderedGrouped: DME srcIdx: 143, targetIdx: 404, attemptNum: 0, payload: [hasEmptyPartitions: true, host: cn047-10.l42scl.hortonworks.com, port: 13562, pathComponent: attempt_1429683757595_0141_1_00_000404_0_10009_0, runDuration: 0] 2015-04-23 03:28:08,270 INFO [TezTaskEventRouter[attempt_1429683757595_0141_1_01_000143_0]] orderedgrouped.ShuffleInputEventHandlerOrderedGrouped: DME srcIdx: 143, targetIdx: 404, attemptNum: 0, payload: [hasEmptyPartitions: true, host: cn047-10.l42scl.hortonworks.com, port: 13562, pathComponent: attempt_1429683757595_0141_1_00_000404_0_10009_1, runDuration: 0] 2015-04-23 03:28:08,272 INFO
[jira] [Updated] (TEZ-2358) Pipelined Shuffle: MergeManager assumptions about 1 merge per source-task
[ https://issues.apache.org/jira/browse/TEZ-2358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gopal V updated TEZ-2358: - Priority: Blocker (was: Major) Pipelined Shuffle: MergeManager assumptions about 1 merge per source-task - Key: TEZ-2358 URL: https://issues.apache.org/jira/browse/TEZ-2358 Project: Apache Tez Issue Type: Bug Affects Versions: 0.7.0 Reporter: Gopal V Assignee: Rajesh Balamohan Priority: Blocker Attachments: TEZ-2358.1.patch, TEZ-2358.2.patch, TEZ-2358.3.patch, TEZ-2358.4.patch, syslog_attempt_1429683757595_0141_1_01_000143_0.syslog.bz2 The Tez MergeManager code assumes that the src-task-id is unique between merge operations, this results in some confusion when two merge sequences have to process output from the same src-task-id. {code} private TezRawKeyValueIterator finalMerge(Configuration job, FileSystem fs, ListMapOutput inMemoryMapOutputs, ListFileChunk onDiskMapOutputs ... if (inMemoryMapOutputs.size() 0) { int srcTaskId = inMemoryMapOutputs.get(0).getAttemptIdentifier().getInputIdentifier().getInputIndex(); ... // must spill to disk, but can't retain in-mem for intermediate merge final Path outputPath = mapOutputFile.getInputFileForWrite(srcTaskId, inMemToDiskBytes).suffix( Constants.MERGED_OUTPUT_PREFIX); ... {code} This or some scenario related to this, results in the following FileChunks list which contains identical named paths with different lengths. {code} 2015-04-23 03:28:50,983 INFO [MemtoDiskMerger [Map_1]] orderedgrouped.MergeManager: Initiating in-memory merge with 6 segments... 2015-04-23 03:28:50,987 INFO [MemtoDiskMerger [Map_1]] impl.TezMerger: Merging 6 sorted segments 2015-04-23 03:28:50,988 INFO [MemtoDiskMerger [Map_1]] impl.TezMerger: Down to the last merge-pass, with 6 segments left of total size: 1165944755 bytes 2015-04-23 03:28:58,495 INFO [MemtoDiskMerger [Map_1]] orderedgrouped.MergeManager: attempt_1429683757595_0141_1_01_000143_0_10027 Merge of the 6 files in-memory complete. Local file is /grid/5/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_404.out.merged of size 785583965 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] orderedgrouped.MergeManager: finalMerge called with 0 in-memory map-outputs and 5 on-disk map-outputs 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] orderedgrouped.MergeManager: GOPAL: onDiskBytes = 365232290 += 365232290for/grid/4/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_1023.out 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] orderedgrouped.MergeManager: GOPAL: onDiskBytes = 730529899 += 365297609for/grid/5/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_404.out 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] orderedgrouped.MergeManager: GOPAL: onDiskBytes = 1095828683 += 365298784for/grid/5/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_404.out {code} The multiple instances of 404.out indicates that we pulled two pipelined chunks of the same shuffle src id, once into memory and twice onto disk. {code} 2015-04-23 03:28:08,256 INFO [TezTaskEventRouter[attempt_1429683757595_0141_1_01_000143_0]] orderedgrouped.ShuffleInputEventHandlerOrderedGrouped: DME srcIdx: 143, targetIdx: 404, attemptNum: 0, payload: [hasEmptyPartitions: true, host: cn047-10.l42scl.hortonworks.com, port: 13562, pathComponent: attempt_1429683757595_0141_1_00_000404_0_10009_0, runDuration: 0] 2015-04-23 03:28:08,270 INFO [TezTaskEventRouter[attempt_1429683757595_0141_1_01_000143_0]] orderedgrouped.ShuffleInputEventHandlerOrderedGrouped: DME srcIdx: 143, targetIdx: 404, attemptNum: 0, payload: [hasEmptyPartitions: true, host: cn047-10.l42scl.hortonworks.com, port: 13562, pathComponent: attempt_1429683757595_0141_1_00_000404_0_10009_1, runDuration: 0] 2015-04-23 03:28:08,272 INFO [TezTaskEventRouter[attempt_1429683757595_0141_1_01_000143_0]] orderedgrouped.ShuffleInputEventHandlerOrderedGrouped: DME srcIdx: 143, targetIdx: 404, attemptNum: 0, payload: [hasEmptyPartitions: true, host: cn047-10.l42scl.hortonworks.com, port: 13562, pathComponent: attempt_1429683757595_0141_1_00_000404_0_10009_2, runDuration: 0] {code} This will fail depending on
[jira] [Updated] (TEZ-2358) Pipelined Shuffle: MergeManager assumptions about 1 merge per source-task
[ https://issues.apache.org/jira/browse/TEZ-2358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajesh Balamohan updated TEZ-2358: -- Attachment: TEZ-2358.4.patch Sure, addressing review comments in the latest patch. Pipelined Shuffle: MergeManager assumptions about 1 merge per source-task - Key: TEZ-2358 URL: https://issues.apache.org/jira/browse/TEZ-2358 Project: Apache Tez Issue Type: Bug Affects Versions: 0.7.0 Reporter: Gopal V Assignee: Rajesh Balamohan Attachments: TEZ-2358.1.patch, TEZ-2358.2.patch, TEZ-2358.3.patch, TEZ-2358.4.patch, syslog_attempt_1429683757595_0141_1_01_000143_0.syslog.bz2 The Tez MergeManager code assumes that the src-task-id is unique between merge operations, this results in some confusion when two merge sequences have to process output from the same src-task-id. {code} private TezRawKeyValueIterator finalMerge(Configuration job, FileSystem fs, ListMapOutput inMemoryMapOutputs, ListFileChunk onDiskMapOutputs ... if (inMemoryMapOutputs.size() 0) { int srcTaskId = inMemoryMapOutputs.get(0).getAttemptIdentifier().getInputIdentifier().getInputIndex(); ... // must spill to disk, but can't retain in-mem for intermediate merge final Path outputPath = mapOutputFile.getInputFileForWrite(srcTaskId, inMemToDiskBytes).suffix( Constants.MERGED_OUTPUT_PREFIX); ... {code} This or some scenario related to this, results in the following FileChunks list which contains identical named paths with different lengths. {code} 2015-04-23 03:28:50,983 INFO [MemtoDiskMerger [Map_1]] orderedgrouped.MergeManager: Initiating in-memory merge with 6 segments... 2015-04-23 03:28:50,987 INFO [MemtoDiskMerger [Map_1]] impl.TezMerger: Merging 6 sorted segments 2015-04-23 03:28:50,988 INFO [MemtoDiskMerger [Map_1]] impl.TezMerger: Down to the last merge-pass, with 6 segments left of total size: 1165944755 bytes 2015-04-23 03:28:58,495 INFO [MemtoDiskMerger [Map_1]] orderedgrouped.MergeManager: attempt_1429683757595_0141_1_01_000143_0_10027 Merge of the 6 files in-memory complete. Local file is /grid/5/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_404.out.merged of size 785583965 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] orderedgrouped.MergeManager: finalMerge called with 0 in-memory map-outputs and 5 on-disk map-outputs 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] orderedgrouped.MergeManager: GOPAL: onDiskBytes = 365232290 += 365232290for/grid/4/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_1023.out 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] orderedgrouped.MergeManager: GOPAL: onDiskBytes = 730529899 += 365297609for/grid/5/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_404.out 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] orderedgrouped.MergeManager: GOPAL: onDiskBytes = 1095828683 += 365298784for/grid/5/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_404.out {code} The multiple instances of 404.out indicates that we pulled two pipelined chunks of the same shuffle src id, once into memory and twice onto disk. {code} 2015-04-23 03:28:08,256 INFO [TezTaskEventRouter[attempt_1429683757595_0141_1_01_000143_0]] orderedgrouped.ShuffleInputEventHandlerOrderedGrouped: DME srcIdx: 143, targetIdx: 404, attemptNum: 0, payload: [hasEmptyPartitions: true, host: cn047-10.l42scl.hortonworks.com, port: 13562, pathComponent: attempt_1429683757595_0141_1_00_000404_0_10009_0, runDuration: 0] 2015-04-23 03:28:08,270 INFO [TezTaskEventRouter[attempt_1429683757595_0141_1_01_000143_0]] orderedgrouped.ShuffleInputEventHandlerOrderedGrouped: DME srcIdx: 143, targetIdx: 404, attemptNum: 0, payload: [hasEmptyPartitions: true, host: cn047-10.l42scl.hortonworks.com, port: 13562, pathComponent: attempt_1429683757595_0141_1_00_000404_0_10009_1, runDuration: 0] 2015-04-23 03:28:08,272 INFO [TezTaskEventRouter[attempt_1429683757595_0141_1_01_000143_0]] orderedgrouped.ShuffleInputEventHandlerOrderedGrouped: DME srcIdx: 143, targetIdx: 404, attemptNum: 0, payload: [hasEmptyPartitions: true, host: cn047-10.l42scl.hortonworks.com, port: 13562, pathComponent: attempt_1429683757595_0141_1_00_000404_0_10009_2, runDuration:
[jira] [Updated] (TEZ-2358) Pipelined Shuffle: MergeManager assumptions about 1 merge per source-task
[ https://issues.apache.org/jira/browse/TEZ-2358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajesh Balamohan updated TEZ-2358: -- Attachment: TEZ-2358.1.patch uploading right patch. Pipelined Shuffle: MergeManager assumptions about 1 merge per source-task - Key: TEZ-2358 URL: https://issues.apache.org/jira/browse/TEZ-2358 Project: Apache Tez Issue Type: Bug Affects Versions: 0.7.0 Reporter: Gopal V Assignee: Rajesh Balamohan Attachments: TEZ-2358.1.patch, syslog_attempt_1429683757595_0141_1_01_000143_0.syslog.bz2 The Tez MergeManager code assumes that the src-task-id is unique between merge operations, this results in some confusion when two merge sequences have to process output from the same src-task-id. {code} private TezRawKeyValueIterator finalMerge(Configuration job, FileSystem fs, ListMapOutput inMemoryMapOutputs, ListFileChunk onDiskMapOutputs ... if (inMemoryMapOutputs.size() 0) { int srcTaskId = inMemoryMapOutputs.get(0).getAttemptIdentifier().getInputIdentifier().getInputIndex(); ... // must spill to disk, but can't retain in-mem for intermediate merge final Path outputPath = mapOutputFile.getInputFileForWrite(srcTaskId, inMemToDiskBytes).suffix( Constants.MERGED_OUTPUT_PREFIX); ... {code} This or some scenario related to this, results in the following FileChunks list which contains identical named paths with different lengths. {code} 2015-04-23 03:28:50,983 INFO [MemtoDiskMerger [Map_1]] orderedgrouped.MergeManager: Initiating in-memory merge with 6 segments... 2015-04-23 03:28:50,987 INFO [MemtoDiskMerger [Map_1]] impl.TezMerger: Merging 6 sorted segments 2015-04-23 03:28:50,988 INFO [MemtoDiskMerger [Map_1]] impl.TezMerger: Down to the last merge-pass, with 6 segments left of total size: 1165944755 bytes 2015-04-23 03:28:58,495 INFO [MemtoDiskMerger [Map_1]] orderedgrouped.MergeManager: attempt_1429683757595_0141_1_01_000143_0_10027 Merge of the 6 files in-memory complete. Local file is /grid/5/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_404.out.merged of size 785583965 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] orderedgrouped.MergeManager: finalMerge called with 0 in-memory map-outputs and 5 on-disk map-outputs 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] orderedgrouped.MergeManager: GOPAL: onDiskBytes = 365232290 += 365232290for/grid/4/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_1023.out 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] orderedgrouped.MergeManager: GOPAL: onDiskBytes = 730529899 += 365297609for/grid/5/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_404.out 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] orderedgrouped.MergeManager: GOPAL: onDiskBytes = 1095828683 += 365298784for/grid/5/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_404.out {code} The multiple instances of 404.out indicates that we pulled two pipelined chunks of the same shuffle src id, once into memory and twice onto disk. {code} 2015-04-23 03:28:08,256 INFO [TezTaskEventRouter[attempt_1429683757595_0141_1_01_000143_0]] orderedgrouped.ShuffleInputEventHandlerOrderedGrouped: DME srcIdx: 143, targetIdx: 404, attemptNum: 0, payload: [hasEmptyPartitions: true, host: cn047-10.l42scl.hortonworks.com, port: 13562, pathComponent: attempt_1429683757595_0141_1_00_000404_0_10009_0, runDuration: 0] 2015-04-23 03:28:08,270 INFO [TezTaskEventRouter[attempt_1429683757595_0141_1_01_000143_0]] orderedgrouped.ShuffleInputEventHandlerOrderedGrouped: DME srcIdx: 143, targetIdx: 404, attemptNum: 0, payload: [hasEmptyPartitions: true, host: cn047-10.l42scl.hortonworks.com, port: 13562, pathComponent: attempt_1429683757595_0141_1_00_000404_0_10009_1, runDuration: 0] 2015-04-23 03:28:08,272 INFO [TezTaskEventRouter[attempt_1429683757595_0141_1_01_000143_0]] orderedgrouped.ShuffleInputEventHandlerOrderedGrouped: DME srcIdx: 143, targetIdx: 404, attemptNum: 0, payload: [hasEmptyPartitions: true, host: cn047-10.l42scl.hortonworks.com, port: 13562, pathComponent: attempt_1429683757595_0141_1_00_000404_0_10009_2, runDuration: 0] {code} This will fail depending on how many times _404_0 is at the top of the
[jira] [Updated] (TEZ-2358) Pipelined Shuffle: MergeManager assumptions about 1 merge per source-task
[ https://issues.apache.org/jira/browse/TEZ-2358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajesh Balamohan updated TEZ-2358: -- Attachment: TEZ-2358.1.patch Since spill_Id was missing, it ended up clobbering the files. Attaching the .1 patch which adds makes it unique with srcId spillId. [~gopalv], [~sseth] Please have a look at the patch. MAX_VALUE in the patch would not collision as it is handled in the finalMerge (which is invoked exactly once during merge close) and it would generate exactly one file with that spill id during final merge. Pipelined Shuffle: MergeManager assumptions about 1 merge per source-task - Key: TEZ-2358 URL: https://issues.apache.org/jira/browse/TEZ-2358 Project: Apache Tez Issue Type: Bug Affects Versions: 0.7.0 Reporter: Gopal V Assignee: Rajesh Balamohan Attachments: TEZ-2358.1.patch, syslog_attempt_1429683757595_0141_1_01_000143_0.syslog.bz2 The Tez MergeManager code assumes that the src-task-id is unique between merge operations, this results in some confusion when two merge sequences have to process output from the same src-task-id. {code} private TezRawKeyValueIterator finalMerge(Configuration job, FileSystem fs, ListMapOutput inMemoryMapOutputs, ListFileChunk onDiskMapOutputs ... if (inMemoryMapOutputs.size() 0) { int srcTaskId = inMemoryMapOutputs.get(0).getAttemptIdentifier().getInputIdentifier().getInputIndex(); ... // must spill to disk, but can't retain in-mem for intermediate merge final Path outputPath = mapOutputFile.getInputFileForWrite(srcTaskId, inMemToDiskBytes).suffix( Constants.MERGED_OUTPUT_PREFIX); ... {code} This or some scenario related to this, results in the following FileChunks list which contains identical named paths with different lengths. {code} 2015-04-23 03:28:50,983 INFO [MemtoDiskMerger [Map_1]] orderedgrouped.MergeManager: Initiating in-memory merge with 6 segments... 2015-04-23 03:28:50,987 INFO [MemtoDiskMerger [Map_1]] impl.TezMerger: Merging 6 sorted segments 2015-04-23 03:28:50,988 INFO [MemtoDiskMerger [Map_1]] impl.TezMerger: Down to the last merge-pass, with 6 segments left of total size: 1165944755 bytes 2015-04-23 03:28:58,495 INFO [MemtoDiskMerger [Map_1]] orderedgrouped.MergeManager: attempt_1429683757595_0141_1_01_000143_0_10027 Merge of the 6 files in-memory complete. Local file is /grid/5/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_404.out.merged of size 785583965 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] orderedgrouped.MergeManager: finalMerge called with 0 in-memory map-outputs and 5 on-disk map-outputs 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] orderedgrouped.MergeManager: GOPAL: onDiskBytes = 365232290 += 365232290for/grid/4/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_1023.out 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] orderedgrouped.MergeManager: GOPAL: onDiskBytes = 730529899 += 365297609for/grid/5/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_404.out 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] orderedgrouped.MergeManager: GOPAL: onDiskBytes = 1095828683 += 365298784for/grid/5/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_404.out {code} The multiple instances of 404.out indicates that we pulled two pipelined chunks of the same shuffle src id, once into memory and twice onto disk. {code} 2015-04-23 03:28:08,256 INFO [TezTaskEventRouter[attempt_1429683757595_0141_1_01_000143_0]] orderedgrouped.ShuffleInputEventHandlerOrderedGrouped: DME srcIdx: 143, targetIdx: 404, attemptNum: 0, payload: [hasEmptyPartitions: true, host: cn047-10.l42scl.hortonworks.com, port: 13562, pathComponent: attempt_1429683757595_0141_1_00_000404_0_10009_0, runDuration: 0] 2015-04-23 03:28:08,270 INFO [TezTaskEventRouter[attempt_1429683757595_0141_1_01_000143_0]] orderedgrouped.ShuffleInputEventHandlerOrderedGrouped: DME srcIdx: 143, targetIdx: 404, attemptNum: 0, payload: [hasEmptyPartitions: true, host: cn047-10.l42scl.hortonworks.com, port: 13562, pathComponent: attempt_1429683757595_0141_1_00_000404_0_10009_1, runDuration: 0] 2015-04-23 03:28:08,272 INFO
[jira] [Updated] (TEZ-2358) Pipelined Shuffle: MergeManager assumptions about 1 merge per source-task
[ https://issues.apache.org/jira/browse/TEZ-2358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gopal V updated TEZ-2358: - Summary: Pipelined Shuffle: MergeManager assumptions about 1 merge per source-task (was: Fix MergeManager assumptions about 1 merge per source-task) Pipelined Shuffle: MergeManager assumptions about 1 merge per source-task - Key: TEZ-2358 URL: https://issues.apache.org/jira/browse/TEZ-2358 Project: Apache Tez Issue Type: Bug Affects Versions: 0.7.0 Reporter: Gopal V Assignee: Rajesh Balamohan Attachments: syslog_attempt_1429683757595_0141_1_01_000143_0.syslog.bz2 The Tez MergeManager code assumes that the src-task-id is unique between merge operations, this results in some confusion when two merge sequences have to process output from the same src-task-id. {code} private TezRawKeyValueIterator finalMerge(Configuration job, FileSystem fs, ListMapOutput inMemoryMapOutputs, ListFileChunk onDiskMapOutputs ... if (inMemoryMapOutputs.size() 0) { int srcTaskId = inMemoryMapOutputs.get(0).getAttemptIdentifier().getInputIdentifier().getInputIndex(); ... // must spill to disk, but can't retain in-mem for intermediate merge final Path outputPath = mapOutputFile.getInputFileForWrite(srcTaskId, inMemToDiskBytes).suffix( Constants.MERGED_OUTPUT_PREFIX); ... {code} This or some scenario related to this, results in the following FileChunks list which contains identical named paths with different lengths. {code} 2015-04-23 03:28:50,983 INFO [MemtoDiskMerger [Map_1]] orderedgrouped.MergeManager: Initiating in-memory merge with 6 segments... 2015-04-23 03:28:50,987 INFO [MemtoDiskMerger [Map_1]] impl.TezMerger: Merging 6 sorted segments 2015-04-23 03:28:50,988 INFO [MemtoDiskMerger [Map_1]] impl.TezMerger: Down to the last merge-pass, with 6 segments left of total size: 1165944755 bytes 2015-04-23 03:28:58,495 INFO [MemtoDiskMerger [Map_1]] orderedgrouped.MergeManager: attempt_1429683757595_0141_1_01_000143_0_10027 Merge of the 6 files in-memory complete. Local file is /grid/5/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_404.out.merged of size 785583965 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] orderedgrouped.MergeManager: finalMerge called with 0 in-memory map-outputs and 5 on-disk map-outputs 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] orderedgrouped.MergeManager: GOPAL: onDiskBytes = 365232290 += 365232290for/grid/4/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_1023.out 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] orderedgrouped.MergeManager: GOPAL: onDiskBytes = 730529899 += 365297609for/grid/5/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_404.out 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] orderedgrouped.MergeManager: GOPAL: onDiskBytes = 1095828683 += 365298784for/grid/5/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_404.out {code} The multiple instances of 404.out indicates that we pulled two pipelined chunks of the same shuffle src id, once into memory and twice onto disk. {code} 2015-04-23 03:28:08,256 INFO [TezTaskEventRouter[attempt_1429683757595_0141_1_01_000143_0]] orderedgrouped.ShuffleInputEventHandlerOrderedGrouped: DME srcIdx: 143, targetIdx: 404, attemptNum: 0, payload: [hasEmptyPartitions: true, host: cn047-10.l42scl.hortonworks.com, port: 13562, pathComponent: attempt_1429683757595_0141_1_00_000404_0_10009_0, runDuration: 0] 2015-04-23 03:28:08,270 INFO [TezTaskEventRouter[attempt_1429683757595_0141_1_01_000143_0]] orderedgrouped.ShuffleInputEventHandlerOrderedGrouped: DME srcIdx: 143, targetIdx: 404, attemptNum: 0, payload: [hasEmptyPartitions: true, host: cn047-10.l42scl.hortonworks.com, port: 13562, pathComponent: attempt_1429683757595_0141_1_00_000404_0_10009_1, runDuration: 0] 2015-04-23 03:28:08,272 INFO [TezTaskEventRouter[attempt_1429683757595_0141_1_01_000143_0]] orderedgrouped.ShuffleInputEventHandlerOrderedGrouped: DME srcIdx: 143, targetIdx: 404, attemptNum: 0, payload: [hasEmptyPartitions: true, host: cn047-10.l42scl.hortonworks.com, port: 13562, pathComponent: attempt_1429683757595_0141_1_00_000404_0_10009_2, runDuration: 0] {code} This will