First of all MRInput/OutputLegacy is not recommended. They are present only for applications that have used MR in hacky ways that we don’t want to support.
Coming to the main question. Were you using grouping of splits earlier? If yes then MyInputFormat would have been called inside the AM even then. So things should not be different now. If you were not using grouping earlier then were you generating the splits on the client and then distributing them to tasks via the AM (did your AM run MRInputSplitDistributor). If so, then in the configurer you can specify generateSplitsInAM(false). This will generate splits in the client and distribute them to tasks in the AM. Bikas *From:* Thaddeus Diamond [mailto:[email protected]] *Sent:* Sunday, August 10, 2014 3:40 PM *To:* [email protected] *Subject:* Re: TezGroupedSplit ClassCastException Okay, So I tried to change: byte[] mapInputPayload = MRHelpers.createMRInputPayloadWithGrouping(mapPayload); MRHelpers.addMRInput(mapVertex, mapInputPayload, null); MRHelpers.addMROutputLegacy(mapVertex, mapPayload); To the suggested DataSourceDescriptor dataSource = MRInputLegacy.createConfigurer(configuration, MyInputFormat.class).create(); mapVertex.addDataSource("initialmapinput", dataSource); DataSinkDescriptor dataSink = MROutputLegacy.createtConfigurer(configuration, NullOutputFormat.class).create(); mapVertex.addDataSink("initialmapoutput", dataSink); But it appears that the AM is trying to call MyInputFormat.getSplits() which it never used to. This is causing an application issue since my application expects getSplits() to be called from the client when MRHelpers.generateInputSplits(...) is called. Using MRInput instead of MRInputLegacy did not help. The stacktrace of the getSplits() call is: at org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat.getSplits(TezGroupedSplitsInputFormat.java:102) at org.apache.tez.mapreduce.hadoop.MRHelpers.generateNewSplits(MRHelpers.java:267) at org.apache.tez.mapreduce.hadoop.MRHelpers.generateInputSplitsToMem(MRHelpers.java:460) at org.apache.tez.mapreduce.common.MRInputAMSplitGenerator.initialize(MRInputAMSplitGenerator.java:108) at org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable$1.run(RootInputInitializerManager.java:175) at org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable$1.run(RootInputInitializerManager.java:169) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1554) at org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable.call(RootInputInitializerManager.java:169) at org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable.call(RootInputInitializerManager.java:156) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) - Thad On Sat, Aug 9, 2014 at 7:55 PM, Bikas Saha <[email protected]> wrote: The following change is correct. However, this MRHelpers methods is soon going to disappear. We recommend you switch to MRInput.createConfigurer() and MROutput.createConfigurer() methods. Also, switch to the *EdgeConfigurer methods e.g. OrderedPartitionedEdgeConfigurer for edge configuration. Please look at WordCount or OrderedWordCount for example code. byte[] mapInputPayload = MRHelpers.createMRInputPayloadWithGrouping(mapPayload, JobContextInputFormat.class.getName()); To byte[] mapInputPayload = MRHelpers.createMRInputPayloadWithGrouping(mapPayload); *From:* Thaddeus Diamond [mailto:[email protected]] *Sent:* Saturday, August 09, 2014 4:10 PM *To:* [email protected] *Subject:* TezGroupedSplit ClassCastException After changeset b0c87d9 my Tez DAG is now broken. In the logs I'm seeing: TaskAttempt 3 failed, info=[Error: Failure while running task:java.lang.ClassCastException: com.hadapt.de.storage.hadoop.StorageJobInputSplit cannot be cast to org.apache.hadoop.mapreduce.split.TezGroupedSplit at org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat.createRecordReader(TezGroupedSplitsInputFormat.java:111) at org.apache.tez.mapreduce.lib.MRReaderMapReduce.setupNewRecordReader(MRReaderMapReduce.java:148) at org.apache.tez.mapreduce.lib.MRReaderMapReduce.<init>(MRReaderMapReduce.java:78) at org.apache.tez.mapreduce.input.MRInput.initializeInternal(MRInput.java:327) at org.apache.tez.mapreduce.input.MRInputLegacy.init(MRInputLegacy.java:109) at org.apache.tez.mapreduce.processor.map.MapProcessor.run(MapProcessor.java:103) at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:324) at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:180) at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:172) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1554) at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.call(TezTaskRunner.java:172) at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.call(TezTaskRunner.java:167) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) ]], Vertex failed as one or more tasks failed. failedTasks:1] In order to get my DAG to compile I had to change the following line: byte[] mapInputPayload = MRHelpers.createMRInputPayloadWithGrouping(mapPayload, JobContextInputFormat.class.getName()); To byte[] mapInputPayload = MRHelpers.createMRInputPayloadWithGrouping(mapPayload); I think the relevant change in the Tez project's commit is: private static byte[] createMRInputPayload(ByteString bytes, - MRSplitsProto mrSplitsProto, String inputFormatName) throws IOException { + MRSplitsProto mrSplitsProto, boolean isGrouped) throws IOException { MRInputUserPayloadProto.Builder userPayloadBuilder = MRInputUserPayloadProto .newBuilder(); userPayloadBuilder.setConfigurationBytes(bytes); if (mrSplitsProto != null) { userPayloadBuilder.setSplits(mrSplitsProto); } - if (inputFormatName!=null) { - userPayloadBuilder.setInputFormatName(inputFormatName); - } + userPayloadBuilder.setGroupingEnabled(isGrouped); // TODO Should this be a ByteBuffer or a byte array ? A ByteBuffer would be // more efficient. return userPayloadBuilder.build().toByteArray(); } - + Please advise. Thanks, Thad CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You. -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.
