After changeset b0c87d9 my Tez DAG is now broken. In the logs I'm seeing:
TaskAttempt 3 failed, info=[Error: Failure while running
task:java.lang.ClassCastException:
com.hadapt.de.storage.hadoop.StorageJobInputSplit cannot be cast to
org.apache.hadoop.mapreduce.split.TezGroupedSplit
at
org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat.createRecordReader(TezGroupedSplitsInputFormat.java:111)
at
org.apache.tez.mapreduce.lib.MRReaderMapReduce.setupNewRecordReader(MRReaderMapReduce.java:148)
at
org.apache.tez.mapreduce.lib.MRReaderMapReduce.<init>(MRReaderMapReduce.java:78)
at
org.apache.tez.mapreduce.input.MRInput.initializeInternal(MRInput.java:327)
at org.apache.tez.mapreduce.input.MRInputLegacy.init(MRInputLegacy.java:109)
at
org.apache.tez.mapreduce.processor.map.MapProcessor.run(MapProcessor.java:103)
at
org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:324)
at
org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:180)
at
org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:172)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1554)
at
org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.call(TezTaskRunner.java:172)
at
org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.call(TezTaskRunner.java:167)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
]], Vertex failed as one or more tasks failed. failedTasks:1]
In order to get my DAG to compile I had to change the following line:
byte[] mapInputPayload =
MRHelpers.createMRInputPayloadWithGrouping(mapPayload,
JobContextInputFormat.class.getName());
To
byte[] mapInputPayload =
MRHelpers.createMRInputPayloadWithGrouping(mapPayload);
I think the relevant change in the Tez project's commit is:
private static byte[] createMRInputPayload(ByteString bytes,
- MRSplitsProto mrSplitsProto, String inputFormatName) throws
IOException {
+ MRSplitsProto mrSplitsProto, boolean isGrouped) throws IOException {
MRInputUserPayloadProto.Builder userPayloadBuilder =
MRInputUserPayloadProto
.newBuilder();
userPayloadBuilder.setConfigurationBytes(bytes);
if (mrSplitsProto != null) {
userPayloadBuilder.setSplits(mrSplitsProto);
}
- if (inputFormatName!=null) {
- userPayloadBuilder.setInputFormatName(inputFormatName);
- }
+ userPayloadBuilder.setGroupingEnabled(isGrouped);
// TODO Should this be a ByteBuffer or a byte array ? A ByteBuffer
would be
// more efficient.
return userPayloadBuilder.build().toByteArray();
}
-
+
Please advise.
Thanks,
Thad