After changeset b0c87d9 my Tez DAG is now broken.  In the logs I'm seeing:

TaskAttempt 3 failed, info=[Error: Failure while running
task:java.lang.ClassCastException:
com.hadapt.de.storage.hadoop.StorageJobInputSplit cannot be cast to
org.apache.hadoop.mapreduce.split.TezGroupedSplit
at
org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat.createRecordReader(TezGroupedSplitsInputFormat.java:111)
at
org.apache.tez.mapreduce.lib.MRReaderMapReduce.setupNewRecordReader(MRReaderMapReduce.java:148)
at
org.apache.tez.mapreduce.lib.MRReaderMapReduce.<init>(MRReaderMapReduce.java:78)
at
org.apache.tez.mapreduce.input.MRInput.initializeInternal(MRInput.java:327)
at org.apache.tez.mapreduce.input.MRInputLegacy.init(MRInputLegacy.java:109)
at
org.apache.tez.mapreduce.processor.map.MapProcessor.run(MapProcessor.java:103)
at
org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:324)
at
org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:180)
at
org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:172)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1554)
at
org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.call(TezTaskRunner.java:172)
at
org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.call(TezTaskRunner.java:167)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
]], Vertex failed as one or more tasks failed. failedTasks:1]

In order to get my DAG to compile I had to change the following line:

byte[] mapInputPayload =
MRHelpers.createMRInputPayloadWithGrouping(mapPayload,
JobContextInputFormat.class.getName());

To

byte[] mapInputPayload =
MRHelpers.createMRInputPayloadWithGrouping(mapPayload);

I think the relevant change in the Tez project's commit is:

   private static byte[] createMRInputPayload(ByteString bytes,
-      MRSplitsProto mrSplitsProto, String inputFormatName) throws
IOException {
+      MRSplitsProto mrSplitsProto, boolean isGrouped) throws IOException {
     MRInputUserPayloadProto.Builder userPayloadBuilder =
MRInputUserPayloadProto
         .newBuilder();
     userPayloadBuilder.setConfigurationBytes(bytes);
     if (mrSplitsProto != null) {
       userPayloadBuilder.setSplits(mrSplitsProto);
     }
-    if (inputFormatName!=null) {
-      userPayloadBuilder.setInputFormatName(inputFormatName);
-    }


+    userPayloadBuilder.setGroupingEnabled(isGrouped);

     // TODO Should this be a ByteBuffer or a byte array ? A ByteBuffer
would be
     // more efficient.
     return userPayloadBuilder.build().toByteArray();
   }
-
+

Please advise.

Thanks,
Thad

Reply via email to