[jira] [Updated] (TEZ-2358) Pipelined Shuffle: MergeManager assumptions about 1 merge per source-task

2015-04-27 Thread Rajesh Balamohan (JIRA)

 [ 
https://issues.apache.org/jira/browse/TEZ-2358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajesh Balamohan updated TEZ-2358:
--
Attachment: TEZ-2358.3.patch

Added preconditions check in MergeManager.closeOnDiskFile().  Since we need to 
consider only filepath  offset, we need to iterate through all items in 
onDiskMapOutputs (as fileChunk includes filepath, offset, length). It is still 
fine as it won't be expensive and makes it easier for debugging.

[~gopalv] - Please have a look at the latest patch when you find time.

 Pipelined Shuffle: MergeManager assumptions about 1 merge per source-task
 -

 Key: TEZ-2358
 URL: https://issues.apache.org/jira/browse/TEZ-2358
 Project: Apache Tez
  Issue Type: Bug
Affects Versions: 0.7.0
Reporter: Gopal V
Assignee: Rajesh Balamohan
 Attachments: TEZ-2358.1.patch, TEZ-2358.2.patch, TEZ-2358.3.patch, 
 syslog_attempt_1429683757595_0141_1_01_000143_0.syslog.bz2


 The Tez MergeManager code assumes that the src-task-id is unique between 
 merge operations, this results in some confusion when two merge sequences 
 have to process output from the same src-task-id.
 {code}
 private TezRawKeyValueIterator finalMerge(Configuration job, FileSystem fs,
ListMapOutput inMemoryMapOutputs,
ListFileChunk onDiskMapOutputs
 ...
  if (inMemoryMapOutputs.size()  0) {
   int srcTaskId = 
 inMemoryMapOutputs.get(0).getAttemptIdentifier().getInputIdentifier().getInputIndex();
 ...
// must spill to disk, but can't retain in-mem for intermediate merge
 final Path outputPath = 
   mapOutputFile.getInputFileForWrite(srcTaskId,
  inMemToDiskBytes).suffix(
  
 Constants.MERGED_OUTPUT_PREFIX);
 ...
 {code}
 This or some scenario related to this, results in the following FileChunks 
 list which contains identical named paths with different lengths.
 {code}
 2015-04-23 03:28:50,983 INFO [MemtoDiskMerger [Map_1]] 
 orderedgrouped.MergeManager: Initiating in-memory merge with 6 segments...
 2015-04-23 03:28:50,987 INFO [MemtoDiskMerger [Map_1]] impl.TezMerger: 
 Merging 6 sorted segments
 2015-04-23 03:28:50,988 INFO [MemtoDiskMerger [Map_1]] impl.TezMerger: Down 
 to the last merge-pass, with 6 segments left of total size: 1165944755 bytes
 2015-04-23 03:28:58,495 INFO [MemtoDiskMerger [Map_1]] 
 orderedgrouped.MergeManager: attempt_1429683757595_0141_1_01_000143_0_10027 
 Merge of the 6 files in-memory complete. Local file is 
 /grid/5/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_404.out.merged
  of size 785583965
 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] 
 orderedgrouped.MergeManager: finalMerge called with 0 in-memory map-outputs 
 and 5 on-disk map-outputs
 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] 
 orderedgrouped.MergeManager: GOPAL: onDiskBytes = 365232290 += 
 365232290for/grid/4/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_1023.out
 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] 
 orderedgrouped.MergeManager: GOPAL: onDiskBytes = 730529899 += 
 365297609for/grid/5/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_404.out
 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] 
 orderedgrouped.MergeManager: GOPAL: onDiskBytes = 1095828683 += 
 365298784for/grid/5/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_404.out
 {code}
 The multiple instances of 404.out indicates that we pulled two pipelined 
 chunks of the same shuffle src id, once into memory and twice onto disk.
 {code}
 2015-04-23 03:28:08,256 INFO 
 [TezTaskEventRouter[attempt_1429683757595_0141_1_01_000143_0]] 
 orderedgrouped.ShuffleInputEventHandlerOrderedGrouped: DME srcIdx: 143, 
 targetIdx: 404, attemptNum: 0, payload: [hasEmptyPartitions: true, host: 
 cn047-10.l42scl.hortonworks.com, port: 13562, pathComponent: 
 attempt_1429683757595_0141_1_00_000404_0_10009_0, runDuration: 0]
 2015-04-23 03:28:08,270 INFO 
 [TezTaskEventRouter[attempt_1429683757595_0141_1_01_000143_0]] 
 orderedgrouped.ShuffleInputEventHandlerOrderedGrouped: DME srcIdx: 143, 
 targetIdx: 404, attemptNum: 0, payload: [hasEmptyPartitions: true, host: 
 cn047-10.l42scl.hortonworks.com, port: 13562, pathComponent: 
 attempt_1429683757595_0141_1_00_000404_0_10009_1, runDuration: 0]
 2015-04-23 03:28:08,272 INFO 
 

[jira] [Updated] (TEZ-2358) Pipelined Shuffle: MergeManager assumptions about 1 merge per source-task

2015-04-27 Thread Gopal V (JIRA)

 [ 
https://issues.apache.org/jira/browse/TEZ-2358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gopal V updated TEZ-2358:
-
Priority: Blocker  (was: Major)

 Pipelined Shuffle: MergeManager assumptions about 1 merge per source-task
 -

 Key: TEZ-2358
 URL: https://issues.apache.org/jira/browse/TEZ-2358
 Project: Apache Tez
  Issue Type: Bug
Affects Versions: 0.7.0
Reporter: Gopal V
Assignee: Rajesh Balamohan
Priority: Blocker
 Attachments: TEZ-2358.1.patch, TEZ-2358.2.patch, TEZ-2358.3.patch, 
 TEZ-2358.4.patch, syslog_attempt_1429683757595_0141_1_01_000143_0.syslog.bz2


 The Tez MergeManager code assumes that the src-task-id is unique between 
 merge operations, this results in some confusion when two merge sequences 
 have to process output from the same src-task-id.
 {code}
 private TezRawKeyValueIterator finalMerge(Configuration job, FileSystem fs,
ListMapOutput inMemoryMapOutputs,
ListFileChunk onDiskMapOutputs
 ...
  if (inMemoryMapOutputs.size()  0) {
   int srcTaskId = 
 inMemoryMapOutputs.get(0).getAttemptIdentifier().getInputIdentifier().getInputIndex();
 ...
// must spill to disk, but can't retain in-mem for intermediate merge
 final Path outputPath = 
   mapOutputFile.getInputFileForWrite(srcTaskId,
  inMemToDiskBytes).suffix(
  
 Constants.MERGED_OUTPUT_PREFIX);
 ...
 {code}
 This or some scenario related to this, results in the following FileChunks 
 list which contains identical named paths with different lengths.
 {code}
 2015-04-23 03:28:50,983 INFO [MemtoDiskMerger [Map_1]] 
 orderedgrouped.MergeManager: Initiating in-memory merge with 6 segments...
 2015-04-23 03:28:50,987 INFO [MemtoDiskMerger [Map_1]] impl.TezMerger: 
 Merging 6 sorted segments
 2015-04-23 03:28:50,988 INFO [MemtoDiskMerger [Map_1]] impl.TezMerger: Down 
 to the last merge-pass, with 6 segments left of total size: 1165944755 bytes
 2015-04-23 03:28:58,495 INFO [MemtoDiskMerger [Map_1]] 
 orderedgrouped.MergeManager: attempt_1429683757595_0141_1_01_000143_0_10027 
 Merge of the 6 files in-memory complete. Local file is 
 /grid/5/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_404.out.merged
  of size 785583965
 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] 
 orderedgrouped.MergeManager: finalMerge called with 0 in-memory map-outputs 
 and 5 on-disk map-outputs
 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] 
 orderedgrouped.MergeManager: GOPAL: onDiskBytes = 365232290 += 
 365232290for/grid/4/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_1023.out
 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] 
 orderedgrouped.MergeManager: GOPAL: onDiskBytes = 730529899 += 
 365297609for/grid/5/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_404.out
 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] 
 orderedgrouped.MergeManager: GOPAL: onDiskBytes = 1095828683 += 
 365298784for/grid/5/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_404.out
 {code}
 The multiple instances of 404.out indicates that we pulled two pipelined 
 chunks of the same shuffle src id, once into memory and twice onto disk.
 {code}
 2015-04-23 03:28:08,256 INFO 
 [TezTaskEventRouter[attempt_1429683757595_0141_1_01_000143_0]] 
 orderedgrouped.ShuffleInputEventHandlerOrderedGrouped: DME srcIdx: 143, 
 targetIdx: 404, attemptNum: 0, payload: [hasEmptyPartitions: true, host: 
 cn047-10.l42scl.hortonworks.com, port: 13562, pathComponent: 
 attempt_1429683757595_0141_1_00_000404_0_10009_0, runDuration: 0]
 2015-04-23 03:28:08,270 INFO 
 [TezTaskEventRouter[attempt_1429683757595_0141_1_01_000143_0]] 
 orderedgrouped.ShuffleInputEventHandlerOrderedGrouped: DME srcIdx: 143, 
 targetIdx: 404, attemptNum: 0, payload: [hasEmptyPartitions: true, host: 
 cn047-10.l42scl.hortonworks.com, port: 13562, pathComponent: 
 attempt_1429683757595_0141_1_00_000404_0_10009_1, runDuration: 0]
 2015-04-23 03:28:08,272 INFO 
 [TezTaskEventRouter[attempt_1429683757595_0141_1_01_000143_0]] 
 orderedgrouped.ShuffleInputEventHandlerOrderedGrouped: DME srcIdx: 143, 
 targetIdx: 404, attemptNum: 0, payload: [hasEmptyPartitions: true, host: 
 cn047-10.l42scl.hortonworks.com, port: 13562, pathComponent: 
 attempt_1429683757595_0141_1_00_000404_0_10009_2, runDuration: 0]
 {code}
 This will fail depending on 

[jira] [Updated] (TEZ-2358) Pipelined Shuffle: MergeManager assumptions about 1 merge per source-task

2015-04-27 Thread Rajesh Balamohan (JIRA)

 [ 
https://issues.apache.org/jira/browse/TEZ-2358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajesh Balamohan updated TEZ-2358:
--
Attachment: TEZ-2358.4.patch

Sure, addressing review comments in the latest patch.

 Pipelined Shuffle: MergeManager assumptions about 1 merge per source-task
 -

 Key: TEZ-2358
 URL: https://issues.apache.org/jira/browse/TEZ-2358
 Project: Apache Tez
  Issue Type: Bug
Affects Versions: 0.7.0
Reporter: Gopal V
Assignee: Rajesh Balamohan
 Attachments: TEZ-2358.1.patch, TEZ-2358.2.patch, TEZ-2358.3.patch, 
 TEZ-2358.4.patch, syslog_attempt_1429683757595_0141_1_01_000143_0.syslog.bz2


 The Tez MergeManager code assumes that the src-task-id is unique between 
 merge operations, this results in some confusion when two merge sequences 
 have to process output from the same src-task-id.
 {code}
 private TezRawKeyValueIterator finalMerge(Configuration job, FileSystem fs,
ListMapOutput inMemoryMapOutputs,
ListFileChunk onDiskMapOutputs
 ...
  if (inMemoryMapOutputs.size()  0) {
   int srcTaskId = 
 inMemoryMapOutputs.get(0).getAttemptIdentifier().getInputIdentifier().getInputIndex();
 ...
// must spill to disk, but can't retain in-mem for intermediate merge
 final Path outputPath = 
   mapOutputFile.getInputFileForWrite(srcTaskId,
  inMemToDiskBytes).suffix(
  
 Constants.MERGED_OUTPUT_PREFIX);
 ...
 {code}
 This or some scenario related to this, results in the following FileChunks 
 list which contains identical named paths with different lengths.
 {code}
 2015-04-23 03:28:50,983 INFO [MemtoDiskMerger [Map_1]] 
 orderedgrouped.MergeManager: Initiating in-memory merge with 6 segments...
 2015-04-23 03:28:50,987 INFO [MemtoDiskMerger [Map_1]] impl.TezMerger: 
 Merging 6 sorted segments
 2015-04-23 03:28:50,988 INFO [MemtoDiskMerger [Map_1]] impl.TezMerger: Down 
 to the last merge-pass, with 6 segments left of total size: 1165944755 bytes
 2015-04-23 03:28:58,495 INFO [MemtoDiskMerger [Map_1]] 
 orderedgrouped.MergeManager: attempt_1429683757595_0141_1_01_000143_0_10027 
 Merge of the 6 files in-memory complete. Local file is 
 /grid/5/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_404.out.merged
  of size 785583965
 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] 
 orderedgrouped.MergeManager: finalMerge called with 0 in-memory map-outputs 
 and 5 on-disk map-outputs
 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] 
 orderedgrouped.MergeManager: GOPAL: onDiskBytes = 365232290 += 
 365232290for/grid/4/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_1023.out
 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] 
 orderedgrouped.MergeManager: GOPAL: onDiskBytes = 730529899 += 
 365297609for/grid/5/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_404.out
 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] 
 orderedgrouped.MergeManager: GOPAL: onDiskBytes = 1095828683 += 
 365298784for/grid/5/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_404.out
 {code}
 The multiple instances of 404.out indicates that we pulled two pipelined 
 chunks of the same shuffle src id, once into memory and twice onto disk.
 {code}
 2015-04-23 03:28:08,256 INFO 
 [TezTaskEventRouter[attempt_1429683757595_0141_1_01_000143_0]] 
 orderedgrouped.ShuffleInputEventHandlerOrderedGrouped: DME srcIdx: 143, 
 targetIdx: 404, attemptNum: 0, payload: [hasEmptyPartitions: true, host: 
 cn047-10.l42scl.hortonworks.com, port: 13562, pathComponent: 
 attempt_1429683757595_0141_1_00_000404_0_10009_0, runDuration: 0]
 2015-04-23 03:28:08,270 INFO 
 [TezTaskEventRouter[attempt_1429683757595_0141_1_01_000143_0]] 
 orderedgrouped.ShuffleInputEventHandlerOrderedGrouped: DME srcIdx: 143, 
 targetIdx: 404, attemptNum: 0, payload: [hasEmptyPartitions: true, host: 
 cn047-10.l42scl.hortonworks.com, port: 13562, pathComponent: 
 attempt_1429683757595_0141_1_00_000404_0_10009_1, runDuration: 0]
 2015-04-23 03:28:08,272 INFO 
 [TezTaskEventRouter[attempt_1429683757595_0141_1_01_000143_0]] 
 orderedgrouped.ShuffleInputEventHandlerOrderedGrouped: DME srcIdx: 143, 
 targetIdx: 404, attemptNum: 0, payload: [hasEmptyPartitions: true, host: 
 cn047-10.l42scl.hortonworks.com, port: 13562, pathComponent: 
 attempt_1429683757595_0141_1_00_000404_0_10009_2, runDuration: 

[jira] [Updated] (TEZ-2358) Pipelined Shuffle: MergeManager assumptions about 1 merge per source-task

2015-04-24 Thread Rajesh Balamohan (JIRA)

 [ 
https://issues.apache.org/jira/browse/TEZ-2358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajesh Balamohan updated TEZ-2358:
--
Attachment: TEZ-2358.1.patch

uploading right patch.

 Pipelined Shuffle: MergeManager assumptions about 1 merge per source-task
 -

 Key: TEZ-2358
 URL: https://issues.apache.org/jira/browse/TEZ-2358
 Project: Apache Tez
  Issue Type: Bug
Affects Versions: 0.7.0
Reporter: Gopal V
Assignee: Rajesh Balamohan
 Attachments: TEZ-2358.1.patch, 
 syslog_attempt_1429683757595_0141_1_01_000143_0.syslog.bz2


 The Tez MergeManager code assumes that the src-task-id is unique between 
 merge operations, this results in some confusion when two merge sequences 
 have to process output from the same src-task-id.
 {code}
 private TezRawKeyValueIterator finalMerge(Configuration job, FileSystem fs,
ListMapOutput inMemoryMapOutputs,
ListFileChunk onDiskMapOutputs
 ...
  if (inMemoryMapOutputs.size()  0) {
   int srcTaskId = 
 inMemoryMapOutputs.get(0).getAttemptIdentifier().getInputIdentifier().getInputIndex();
 ...
// must spill to disk, but can't retain in-mem for intermediate merge
 final Path outputPath = 
   mapOutputFile.getInputFileForWrite(srcTaskId,
  inMemToDiskBytes).suffix(
  
 Constants.MERGED_OUTPUT_PREFIX);
 ...
 {code}
 This or some scenario related to this, results in the following FileChunks 
 list which contains identical named paths with different lengths.
 {code}
 2015-04-23 03:28:50,983 INFO [MemtoDiskMerger [Map_1]] 
 orderedgrouped.MergeManager: Initiating in-memory merge with 6 segments...
 2015-04-23 03:28:50,987 INFO [MemtoDiskMerger [Map_1]] impl.TezMerger: 
 Merging 6 sorted segments
 2015-04-23 03:28:50,988 INFO [MemtoDiskMerger [Map_1]] impl.TezMerger: Down 
 to the last merge-pass, with 6 segments left of total size: 1165944755 bytes
 2015-04-23 03:28:58,495 INFO [MemtoDiskMerger [Map_1]] 
 orderedgrouped.MergeManager: attempt_1429683757595_0141_1_01_000143_0_10027 
 Merge of the 6 files in-memory complete. Local file is 
 /grid/5/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_404.out.merged
  of size 785583965
 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] 
 orderedgrouped.MergeManager: finalMerge called with 0 in-memory map-outputs 
 and 5 on-disk map-outputs
 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] 
 orderedgrouped.MergeManager: GOPAL: onDiskBytes = 365232290 += 
 365232290for/grid/4/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_1023.out
 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] 
 orderedgrouped.MergeManager: GOPAL: onDiskBytes = 730529899 += 
 365297609for/grid/5/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_404.out
 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] 
 orderedgrouped.MergeManager: GOPAL: onDiskBytes = 1095828683 += 
 365298784for/grid/5/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_404.out
 {code}
 The multiple instances of 404.out indicates that we pulled two pipelined 
 chunks of the same shuffle src id, once into memory and twice onto disk.
 {code}
 2015-04-23 03:28:08,256 INFO 
 [TezTaskEventRouter[attempt_1429683757595_0141_1_01_000143_0]] 
 orderedgrouped.ShuffleInputEventHandlerOrderedGrouped: DME srcIdx: 143, 
 targetIdx: 404, attemptNum: 0, payload: [hasEmptyPartitions: true, host: 
 cn047-10.l42scl.hortonworks.com, port: 13562, pathComponent: 
 attempt_1429683757595_0141_1_00_000404_0_10009_0, runDuration: 0]
 2015-04-23 03:28:08,270 INFO 
 [TezTaskEventRouter[attempt_1429683757595_0141_1_01_000143_0]] 
 orderedgrouped.ShuffleInputEventHandlerOrderedGrouped: DME srcIdx: 143, 
 targetIdx: 404, attemptNum: 0, payload: [hasEmptyPartitions: true, host: 
 cn047-10.l42scl.hortonworks.com, port: 13562, pathComponent: 
 attempt_1429683757595_0141_1_00_000404_0_10009_1, runDuration: 0]
 2015-04-23 03:28:08,272 INFO 
 [TezTaskEventRouter[attempt_1429683757595_0141_1_01_000143_0]] 
 orderedgrouped.ShuffleInputEventHandlerOrderedGrouped: DME srcIdx: 143, 
 targetIdx: 404, attemptNum: 0, payload: [hasEmptyPartitions: true, host: 
 cn047-10.l42scl.hortonworks.com, port: 13562, pathComponent: 
 attempt_1429683757595_0141_1_00_000404_0_10009_2, runDuration: 0]
 {code}
 This will fail depending on how many times _404_0 is at the top of the 
 

[jira] [Updated] (TEZ-2358) Pipelined Shuffle: MergeManager assumptions about 1 merge per source-task

2015-04-24 Thread Rajesh Balamohan (JIRA)

 [ 
https://issues.apache.org/jira/browse/TEZ-2358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajesh Balamohan updated TEZ-2358:
--
Attachment: TEZ-2358.1.patch

Since spill_Id was missing, it ended up clobbering the files.  Attaching the .1 
patch which adds makes it unique with srcId  spillId.

[~gopalv], [~sseth] Please have a look at the patch. MAX_VALUE in the patch 
would not collision as it is handled in the finalMerge (which is invoked 
exactly once during merge close) and it would generate exactly one file with 
that spill id during final merge.

 Pipelined Shuffle: MergeManager assumptions about 1 merge per source-task
 -

 Key: TEZ-2358
 URL: https://issues.apache.org/jira/browse/TEZ-2358
 Project: Apache Tez
  Issue Type: Bug
Affects Versions: 0.7.0
Reporter: Gopal V
Assignee: Rajesh Balamohan
 Attachments: TEZ-2358.1.patch, 
 syslog_attempt_1429683757595_0141_1_01_000143_0.syslog.bz2


 The Tez MergeManager code assumes that the src-task-id is unique between 
 merge operations, this results in some confusion when two merge sequences 
 have to process output from the same src-task-id.
 {code}
 private TezRawKeyValueIterator finalMerge(Configuration job, FileSystem fs,
ListMapOutput inMemoryMapOutputs,
ListFileChunk onDiskMapOutputs
 ...
  if (inMemoryMapOutputs.size()  0) {
   int srcTaskId = 
 inMemoryMapOutputs.get(0).getAttemptIdentifier().getInputIdentifier().getInputIndex();
 ...
// must spill to disk, but can't retain in-mem for intermediate merge
 final Path outputPath = 
   mapOutputFile.getInputFileForWrite(srcTaskId,
  inMemToDiskBytes).suffix(
  
 Constants.MERGED_OUTPUT_PREFIX);
 ...
 {code}
 This or some scenario related to this, results in the following FileChunks 
 list which contains identical named paths with different lengths.
 {code}
 2015-04-23 03:28:50,983 INFO [MemtoDiskMerger [Map_1]] 
 orderedgrouped.MergeManager: Initiating in-memory merge with 6 segments...
 2015-04-23 03:28:50,987 INFO [MemtoDiskMerger [Map_1]] impl.TezMerger: 
 Merging 6 sorted segments
 2015-04-23 03:28:50,988 INFO [MemtoDiskMerger [Map_1]] impl.TezMerger: Down 
 to the last merge-pass, with 6 segments left of total size: 1165944755 bytes
 2015-04-23 03:28:58,495 INFO [MemtoDiskMerger [Map_1]] 
 orderedgrouped.MergeManager: attempt_1429683757595_0141_1_01_000143_0_10027 
 Merge of the 6 files in-memory complete. Local file is 
 /grid/5/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_404.out.merged
  of size 785583965
 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] 
 orderedgrouped.MergeManager: finalMerge called with 0 in-memory map-outputs 
 and 5 on-disk map-outputs
 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] 
 orderedgrouped.MergeManager: GOPAL: onDiskBytes = 365232290 += 
 365232290for/grid/4/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_1023.out
 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] 
 orderedgrouped.MergeManager: GOPAL: onDiskBytes = 730529899 += 
 365297609for/grid/5/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_404.out
 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] 
 orderedgrouped.MergeManager: GOPAL: onDiskBytes = 1095828683 += 
 365298784for/grid/5/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_404.out
 {code}
 The multiple instances of 404.out indicates that we pulled two pipelined 
 chunks of the same shuffle src id, once into memory and twice onto disk.
 {code}
 2015-04-23 03:28:08,256 INFO 
 [TezTaskEventRouter[attempt_1429683757595_0141_1_01_000143_0]] 
 orderedgrouped.ShuffleInputEventHandlerOrderedGrouped: DME srcIdx: 143, 
 targetIdx: 404, attemptNum: 0, payload: [hasEmptyPartitions: true, host: 
 cn047-10.l42scl.hortonworks.com, port: 13562, pathComponent: 
 attempt_1429683757595_0141_1_00_000404_0_10009_0, runDuration: 0]
 2015-04-23 03:28:08,270 INFO 
 [TezTaskEventRouter[attempt_1429683757595_0141_1_01_000143_0]] 
 orderedgrouped.ShuffleInputEventHandlerOrderedGrouped: DME srcIdx: 143, 
 targetIdx: 404, attemptNum: 0, payload: [hasEmptyPartitions: true, host: 
 cn047-10.l42scl.hortonworks.com, port: 13562, pathComponent: 
 attempt_1429683757595_0141_1_00_000404_0_10009_1, runDuration: 0]
 2015-04-23 03:28:08,272 INFO 
 

[jira] [Updated] (TEZ-2358) Pipelined Shuffle: MergeManager assumptions about 1 merge per source-task

2015-04-23 Thread Gopal V (JIRA)

 [ 
https://issues.apache.org/jira/browse/TEZ-2358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gopal V updated TEZ-2358:
-
Summary: Pipelined Shuffle: MergeManager assumptions about 1 merge per 
source-task  (was: Fix MergeManager assumptions about 1 merge per source-task)

 Pipelined Shuffle: MergeManager assumptions about 1 merge per source-task
 -

 Key: TEZ-2358
 URL: https://issues.apache.org/jira/browse/TEZ-2358
 Project: Apache Tez
  Issue Type: Bug
Affects Versions: 0.7.0
Reporter: Gopal V
Assignee: Rajesh Balamohan
 Attachments: 
 syslog_attempt_1429683757595_0141_1_01_000143_0.syslog.bz2


 The Tez MergeManager code assumes that the src-task-id is unique between 
 merge operations, this results in some confusion when two merge sequences 
 have to process output from the same src-task-id.
 {code}
 private TezRawKeyValueIterator finalMerge(Configuration job, FileSystem fs,
ListMapOutput inMemoryMapOutputs,
ListFileChunk onDiskMapOutputs
 ...
  if (inMemoryMapOutputs.size()  0) {
   int srcTaskId = 
 inMemoryMapOutputs.get(0).getAttemptIdentifier().getInputIdentifier().getInputIndex();
 ...
// must spill to disk, but can't retain in-mem for intermediate merge
 final Path outputPath = 
   mapOutputFile.getInputFileForWrite(srcTaskId,
  inMemToDiskBytes).suffix(
  
 Constants.MERGED_OUTPUT_PREFIX);
 ...
 {code}
 This or some scenario related to this, results in the following FileChunks 
 list which contains identical named paths with different lengths.
 {code}
 2015-04-23 03:28:50,983 INFO [MemtoDiskMerger [Map_1]] 
 orderedgrouped.MergeManager: Initiating in-memory merge with 6 segments...
 2015-04-23 03:28:50,987 INFO [MemtoDiskMerger [Map_1]] impl.TezMerger: 
 Merging 6 sorted segments
 2015-04-23 03:28:50,988 INFO [MemtoDiskMerger [Map_1]] impl.TezMerger: Down 
 to the last merge-pass, with 6 segments left of total size: 1165944755 bytes
 2015-04-23 03:28:58,495 INFO [MemtoDiskMerger [Map_1]] 
 orderedgrouped.MergeManager: attempt_1429683757595_0141_1_01_000143_0_10027 
 Merge of the 6 files in-memory complete. Local file is 
 /grid/5/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_404.out.merged
  of size 785583965
 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] 
 orderedgrouped.MergeManager: finalMerge called with 0 in-memory map-outputs 
 and 5 on-disk map-outputs
 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] 
 orderedgrouped.MergeManager: GOPAL: onDiskBytes = 365232290 += 
 365232290for/grid/4/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_1023.out
 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] 
 orderedgrouped.MergeManager: GOPAL: onDiskBytes = 730529899 += 
 365297609for/grid/5/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_404.out
 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] 
 orderedgrouped.MergeManager: GOPAL: onDiskBytes = 1095828683 += 
 365298784for/grid/5/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_404.out
 {code}
 The multiple instances of 404.out indicates that we pulled two pipelined 
 chunks of the same shuffle src id, once into memory and twice onto disk.
 {code}
 2015-04-23 03:28:08,256 INFO 
 [TezTaskEventRouter[attempt_1429683757595_0141_1_01_000143_0]] 
 orderedgrouped.ShuffleInputEventHandlerOrderedGrouped: DME srcIdx: 143, 
 targetIdx: 404, attemptNum: 0, payload: [hasEmptyPartitions: true, host: 
 cn047-10.l42scl.hortonworks.com, port: 13562, pathComponent: 
 attempt_1429683757595_0141_1_00_000404_0_10009_0, runDuration: 0]
 2015-04-23 03:28:08,270 INFO 
 [TezTaskEventRouter[attempt_1429683757595_0141_1_01_000143_0]] 
 orderedgrouped.ShuffleInputEventHandlerOrderedGrouped: DME srcIdx: 143, 
 targetIdx: 404, attemptNum: 0, payload: [hasEmptyPartitions: true, host: 
 cn047-10.l42scl.hortonworks.com, port: 13562, pathComponent: 
 attempt_1429683757595_0141_1_00_000404_0_10009_1, runDuration: 0]
 2015-04-23 03:28:08,272 INFO 
 [TezTaskEventRouter[attempt_1429683757595_0141_1_01_000143_0]] 
 orderedgrouped.ShuffleInputEventHandlerOrderedGrouped: DME srcIdx: 143, 
 targetIdx: 404, attemptNum: 0, payload: [hasEmptyPartitions: true, host: 
 cn047-10.l42scl.hortonworks.com, port: 13562, pathComponent: 
 attempt_1429683757595_0141_1_00_000404_0_10009_2, runDuration: 0]
 {code}
 This will