Okay,
So I tried to change:
byte[] mapInputPayload =
MRHelpers.createMRInputPayloadWithGrouping(mapPayload);
MRHelpers.addMRInput(mapVertex, mapInputPayload, null);
MRHelpers.addMROutputLegacy(mapVertex, mapPayload);
To the suggested
DataSourceDescriptor dataSource =
MRInputLegacy.createConfigurer(configuration, MyInputFormat.class).create();
mapVertex.addDataSource("initialmapinput", dataSource);
DataSinkDescriptor dataSink =
MROutputLegacy.createtConfigurer(configuration,
NullOutputFormat.class).create();
mapVertex.addDataSink("initialmapoutput", dataSink);
But it appears that the AM is trying to call MyInputFormat.getSplits()
which it never used to. This is causing an application issue since my
application expects getSplits() to be called from the client when
MRHelpers.generateInputSplits(...) is called. Using MRInput instead of
MRInputLegacy did not help.
The stacktrace of the getSplits() call is:
at
org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat.getSplits(TezGroupedSplitsInputFormat.java:102)
at
org.apache.tez.mapreduce.hadoop.MRHelpers.generateNewSplits(MRHelpers.java:267)
at
org.apache.tez.mapreduce.hadoop.MRHelpers.generateInputSplitsToMem(MRHelpers.java:460)
at
org.apache.tez.mapreduce.common.MRInputAMSplitGenerator.initialize(MRInputAMSplitGenerator.java:108)
at
org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable$1.run(RootInputInitializerManager.java:175)
at
org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable$1.run(RootInputInitializerManager.java:169)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1554)
at
org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable.call(RootInputInitializerManager.java:169)
at
org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable.call(RootInputInitializerManager.java:156)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
- Thad
On Sat, Aug 9, 2014 at 7:55 PM, Bikas Saha <[email protected]> wrote:
> The following change is correct. However, this MRHelpers methods is soon
> going to disappear. We recommend you switch to MRInput.createConfigurer()
> and MROutput.createConfigurer() methods. Also, switch to the
> *EdgeConfigurer methods e.g. OrderedPartitionedEdgeConfigurer for edge
> configuration. Please look at WordCount or OrderedWordCount for example
> code.
>
>
>
> byte[] mapInputPayload =
> MRHelpers.createMRInputPayloadWithGrouping(mapPayload,
> JobContextInputFormat.class.getName());
>
>
>
> To
>
>
>
> byte[] mapInputPayload =
> MRHelpers.createMRInputPayloadWithGrouping(mapPayload);
>
>
>
>
>
> *From:* Thaddeus Diamond [mailto:[email protected]]
> *Sent:* Saturday, August 09, 2014 4:10 PM
> *To:* [email protected]
> *Subject:* TezGroupedSplit ClassCastException
>
>
>
> After changeset b0c87d9 my Tez DAG is now broken. In the logs I'm seeing:
>
>
>
> TaskAttempt 3 failed, info=[Error: Failure while running
> task:java.lang.ClassCastException:
> com.hadapt.de.storage.hadoop.StorageJobInputSplit cannot be cast to
> org.apache.hadoop.mapreduce.split.TezGroupedSplit
>
> at
> org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat.createRecordReader(TezGroupedSplitsInputFormat.java:111)
>
> at
> org.apache.tez.mapreduce.lib.MRReaderMapReduce.setupNewRecordReader(MRReaderMapReduce.java:148)
>
> at
> org.apache.tez.mapreduce.lib.MRReaderMapReduce.<init>(MRReaderMapReduce.java:78)
>
> at
> org.apache.tez.mapreduce.input.MRInput.initializeInternal(MRInput.java:327)
>
> at
> org.apache.tez.mapreduce.input.MRInputLegacy.init(MRInputLegacy.java:109)
>
> at
> org.apache.tez.mapreduce.processor.map.MapProcessor.run(MapProcessor.java:103)
>
> at
> org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:324)
>
> at
> org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:180)
>
> at
> org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:172)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at javax.security.auth.Subject.doAs(Subject.java:415)
>
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1554)
>
> at
> org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.call(TezTaskRunner.java:172)
>
> at
> org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.call(TezTaskRunner.java:167)
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
> at java.lang.Thread.run(Thread.java:744)
>
> ]], Vertex failed as one or more tasks failed. failedTasks:1]
>
>
>
> In order to get my DAG to compile I had to change the following line:
>
>
>
> byte[] mapInputPayload =
> MRHelpers.createMRInputPayloadWithGrouping(mapPayload,
> JobContextInputFormat.class.getName());
>
>
>
> To
>
>
>
> byte[] mapInputPayload =
> MRHelpers.createMRInputPayloadWithGrouping(mapPayload);
>
>
>
> I think the relevant change in the Tez project's commit is:
>
>
>
> private static byte[] createMRInputPayload(ByteString bytes,
>
> - MRSplitsProto mrSplitsProto, String inputFormatName) throws
> IOException {
>
> + MRSplitsProto mrSplitsProto, boolean isGrouped) throws IOException {
>
> MRInputUserPayloadProto.Builder userPayloadBuilder =
> MRInputUserPayloadProto
>
> .newBuilder();
>
> userPayloadBuilder.setConfigurationBytes(bytes);
>
> if (mrSplitsProto != null) {
>
> userPayloadBuilder.setSplits(mrSplitsProto);
>
> }
>
> - if (inputFormatName!=null) {
>
> - userPayloadBuilder.setInputFormatName(inputFormatName);
>
> - }
>
>
>
> + userPayloadBuilder.setGroupingEnabled(isGrouped);
>
>
> // TODO Should this be a ByteBuffer or a byte array ? A ByteBuffer
> would be
>
> // more efficient.
>
> return userPayloadBuilder.build().toByteArray();
>
> }
>
> -
>
> +
>
>
>
> Please advise.
>
>
>
> Thanks,
>
> Thad
>
> CONFIDENTIALITY NOTICE
> NOTICE: This message is intended for the use of the individual or entity
> to which it is addressed and may contain information that is confidential,
> privileged and exempt from disclosure under applicable law. If the reader
> of this message is not the intended recipient, you are hereby notified that
> any printing, copying, dissemination, distribution, disclosure or
> forwarding of this communication is strictly prohibited. If you have
> received this communication in error, please contact the sender immediately
> and delete it from your system. Thank You.