OK. The updateLocalResources method is being made private as part of
TEZ-1407. Also, many of the MRHelpers are going away. Moving to the
MRInputConfigurer, or the new method being added via TEZ-1407 (client side
- but may not be supported for long) would be the way forward.


On Mon, Aug 11, 2014 at 8:58 AM, Thaddeus Diamond <
[email protected]> wrote:

> Hey Sid,
>
> I'm actually working with other devs who handle the input split side so I
> don't know quite what's in there, but they are not huge.  Because of the
> way we handle our application (Spring) it's easier to do all the input
> split management and construction up front for jobs.  However, to make your
> lives easier we're actually moving toward a native DAG implementation for
> these jobs so I don't anticipate the MRHelpers methods being needed much
> longer on our end.
>
> Yes, we do use the updateLocalResources...() method.
>
> - Thad
>
>
> On Mon, Aug 11, 2014 at 2:27 AM, Siddharth Seth <[email protected]> wrote:
>
>> Thaddeus,
>> Could you provide some more details on the steps to setup 'mapPayload'
>> itself. It looks like you need to generated splits on the client itself (It
>> would be interesting to know why this isn't possible on the AM). Were you
>> making use of "MRHelpers.updateLocalResourcesForInputSplits" earlier. If
>> so, the current set of MRInput/MRInputLegacy configurer methods don't
>> provide the same functionality.
>> However, the recommendation in this case is to use
>> generateSplitsInAM(false) as Bikas pointed out - a lot of the code to setup
>> MRInputPayload should go away after this. Do you happen to know the size of
>> the splits being generated. Do they have more data beyond the typical
>> "path, offset, size".
>> Many of the APIs in MRHelpers are in the process of being removed or
>> simplified - hopefully this will stabilize more within a week or so.
>>
>> Thanks
>> - Sid
>>
>>
>> On Sun, Aug 10, 2014 at 8:17 PM, Thaddeus Diamond <
>> [email protected]> wrote:
>>
>>> Yeah I tried the normal MRInput/Output and it made no difference, I'll
>>> switch back to using them then.
>>>
>>> Like I said, all I had before was those three lines, nothing about split
>>> distribution or am configuration specified.  Just running
>>> with createMRInputPayload instead of the grouping API appears to be
>>> working, so perhaps I did not need grouping at all.
>>>
>>> With that API the getSplits() method is not called in the AM.
>>>
>>>
>>> On Sun, Aug 10, 2014 at 7:34 PM, Bikas Saha <[email protected]>
>>> wrote:
>>>
>>>> First of all MRInput/OutputLegacy is not recommended. They are present
>>>> only for applications that have used MR in hacky ways that we don’t want to
>>>> support.
>>>>
>>>>
>>>>
>>>> Coming to the main question. Were you using grouping of splits earlier?
>>>> If yes then MyInputFormat would have been called inside the AM even then.
>>>> So things should not be different now. If you were not using grouping
>>>> earlier then were you generating the splits on the client and then
>>>> distributing them to tasks via the AM (did your AM run
>>>> MRInputSplitDistributor). If so, then in the configurer you can specify
>>>> generateSplitsInAM(false). This will generate splits in the client and
>>>> distribute them to tasks in the AM.
>>>>
>>>>
>>>>
>>>> Bikas
>>>>
>>>>
>>>>
>>>> *From:* Thaddeus Diamond [mailto:[email protected]]
>>>> *Sent:* Sunday, August 10, 2014 3:40 PM
>>>> *To:* [email protected]
>>>> *Subject:* Re: TezGroupedSplit ClassCastException
>>>>
>>>>
>>>>
>>>> Okay,
>>>>
>>>>
>>>>
>>>> So I tried to change:
>>>>
>>>>
>>>>
>>>> byte[] mapInputPayload =
>>>> MRHelpers.createMRInputPayloadWithGrouping(mapPayload);
>>>>
>>>> MRHelpers.addMRInput(mapVertex, mapInputPayload, null);
>>>>
>>>> MRHelpers.addMROutputLegacy(mapVertex, mapPayload);
>>>>
>>>>
>>>>
>>>> To the suggested
>>>>
>>>>
>>>>
>>>> DataSourceDescriptor dataSource =
>>>> MRInputLegacy.createConfigurer(configuration, 
>>>> MyInputFormat.class).create();
>>>>
>>>> mapVertex.addDataSource("initialmapinput", dataSource);
>>>>
>>>>
>>>>
>>>> DataSinkDescriptor dataSink =
>>>> MROutputLegacy.createtConfigurer(configuration,
>>>> NullOutputFormat.class).create();
>>>>
>>>> mapVertex.addDataSink("initialmapoutput", dataSink);
>>>>
>>>>
>>>>
>>>> But it appears that the AM is trying to call MyInputFormat.getSplits()
>>>> which it never used to.  This is causing an application issue since my
>>>> application expects getSplits() to be called from the client when
>>>> MRHelpers.generateInputSplits(...) is called.  Using MRInput instead
>>>> of MRInputLegacy did not help.
>>>>
>>>>
>>>>
>>>> The stacktrace of the getSplits() call is:
>>>>
>>>>
>>>>
>>>> at
>>>> org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat.getSplits(TezGroupedSplitsInputFormat.java:102)
>>>>
>>>>      at
>>>> org.apache.tez.mapreduce.hadoop.MRHelpers.generateNewSplits(MRHelpers.java:267)
>>>>
>>>>      at
>>>> org.apache.tez.mapreduce.hadoop.MRHelpers.generateInputSplitsToMem(MRHelpers.java:460)
>>>>
>>>>      at
>>>> org.apache.tez.mapreduce.common.MRInputAMSplitGenerator.initialize(MRInputAMSplitGenerator.java:108)
>>>>
>>>>      at
>>>> org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable$1.run(RootInputInitializerManager.java:175)
>>>>
>>>>      at
>>>> org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable$1.run(RootInputInitializerManager.java:169)
>>>>
>>>>      at java.security.AccessController.doPrivileged(Native Method)
>>>>
>>>>      at javax.security.auth.Subject.doAs(Subject.java:415)
>>>>
>>>>      at
>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1554)
>>>>
>>>>      at
>>>> org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable.call(RootInputInitializerManager.java:169)
>>>>
>>>>      at
>>>> org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable.call(RootInputInitializerManager.java:156)
>>>>
>>>>      at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>>>>
>>>>      at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>
>>>>      at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>
>>>>      at java.lang.Thread.run(Thread.java:744)
>>>>
>>>>
>>>>
>>>> - Thad
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Sat, Aug 9, 2014 at 7:55 PM, Bikas Saha <[email protected]>
>>>> wrote:
>>>>
>>>> The following change is correct. However, this MRHelpers methods is
>>>> soon going to disappear. We recommend you switch to
>>>> MRInput.createConfigurer() and MROutput.createConfigurer() methods. Also,
>>>> switch to the *EdgeConfigurer methods e.g. OrderedPartitionedEdgeConfigurer
>>>> for edge configuration. Please look at WordCount or OrderedWordCount for
>>>> example code.
>>>>
>>>>
>>>>
>>>> byte[] mapInputPayload =
>>>> MRHelpers.createMRInputPayloadWithGrouping(mapPayload,
>>>> JobContextInputFormat.class.getName());
>>>>
>>>>
>>>>
>>>> To
>>>>
>>>>
>>>>
>>>> byte[] mapInputPayload =
>>>> MRHelpers.createMRInputPayloadWithGrouping(mapPayload);
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *From:* Thaddeus Diamond [mailto:[email protected]]
>>>> *Sent:* Saturday, August 09, 2014 4:10 PM
>>>> *To:* [email protected]
>>>> *Subject:* TezGroupedSplit ClassCastException
>>>>
>>>>
>>>>
>>>> After changeset b0c87d9 my Tez DAG is now broken.  In the logs I'm
>>>> seeing:
>>>>
>>>>
>>>>
>>>> TaskAttempt 3 failed, info=[Error: Failure while running
>>>> task:java.lang.ClassCastException:
>>>> com.hadapt.de.storage.hadoop.StorageJobInputSplit cannot be cast to
>>>> org.apache.hadoop.mapreduce.split.TezGroupedSplit
>>>>
>>>>      at
>>>> org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat.createRecordReader(TezGroupedSplitsInputFormat.java:111)
>>>>
>>>>      at
>>>> org.apache.tez.mapreduce.lib.MRReaderMapReduce.setupNewRecordReader(MRReaderMapReduce.java:148)
>>>>
>>>>      at
>>>> org.apache.tez.mapreduce.lib.MRReaderMapReduce.<init>(MRReaderMapReduce.java:78)
>>>>
>>>>      at
>>>> org.apache.tez.mapreduce.input.MRInput.initializeInternal(MRInput.java:327)
>>>>
>>>>      at
>>>> org.apache.tez.mapreduce.input.MRInputLegacy.init(MRInputLegacy.java:109)
>>>>
>>>>      at
>>>> org.apache.tez.mapreduce.processor.map.MapProcessor.run(MapProcessor.java:103)
>>>>
>>>>      at
>>>> org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:324)
>>>>
>>>>      at
>>>> org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:180)
>>>>
>>>>      at
>>>> org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:172)
>>>>
>>>>      at java.security.AccessController.doPrivileged(Native Method)
>>>>
>>>>      at javax.security.auth.Subject.doAs(Subject.java:415)
>>>>
>>>>      at
>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1554)
>>>>
>>>>      at
>>>> org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.call(TezTaskRunner.java:172)
>>>>
>>>>      at
>>>> org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.call(TezTaskRunner.java:167)
>>>>
>>>>      at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>>>>
>>>>      at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>
>>>>      at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>
>>>>      at java.lang.Thread.run(Thread.java:744)
>>>>
>>>> ]], Vertex failed as one or more tasks failed. failedTasks:1]
>>>>
>>>>
>>>>
>>>> In order to get my DAG to compile I had to change the following line:
>>>>
>>>>
>>>>
>>>> byte[] mapInputPayload =
>>>> MRHelpers.createMRInputPayloadWithGrouping(mapPayload,
>>>> JobContextInputFormat.class.getName());
>>>>
>>>>
>>>>
>>>> To
>>>>
>>>>
>>>>
>>>> byte[] mapInputPayload =
>>>> MRHelpers.createMRInputPayloadWithGrouping(mapPayload);
>>>>
>>>>
>>>>
>>>> I think the relevant change in the Tez project's commit is:
>>>>
>>>>
>>>>
>>>>    private static byte[] createMRInputPayload(ByteString bytes,
>>>>
>>>> -      MRSplitsProto mrSplitsProto, String inputFormatName) throws
>>>> IOException {
>>>>
>>>> +      MRSplitsProto mrSplitsProto, boolean isGrouped) throws
>>>> IOException {
>>>>
>>>>      MRInputUserPayloadProto.Builder userPayloadBuilder =
>>>> MRInputUserPayloadProto
>>>>
>>>>          .newBuilder();
>>>>
>>>>      userPayloadBuilder.setConfigurationBytes(bytes);
>>>>
>>>>      if (mrSplitsProto != null) {
>>>>
>>>>        userPayloadBuilder.setSplits(mrSplitsProto);
>>>>
>>>>      }
>>>>
>>>> -    if (inputFormatName!=null) {
>>>>
>>>> -      userPayloadBuilder.setInputFormatName(inputFormatName);
>>>>
>>>> -    }
>>>>
>>>>
>>>>
>>>> +    userPayloadBuilder.setGroupingEnabled(isGrouped);
>>>>
>>>>
>>>>      // TODO Should this be a ByteBuffer or a byte array ? A ByteBuffer
>>>> would be
>>>>
>>>>      // more efficient.
>>>>
>>>>      return userPayloadBuilder.build().toByteArray();
>>>>
>>>>    }
>>>>
>>>> -
>>>>
>>>> +
>>>>
>>>>
>>>>
>>>> Please advise.
>>>>
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> Thad
>>>>
>>>>
>>>> CONFIDENTIALITY NOTICE
>>>> NOTICE: This message is intended for the use of the individual or
>>>> entity to which it is addressed and may contain information that is
>>>> confidential, privileged and exempt from disclosure under applicable law.
>>>> If the reader of this message is not the intended recipient, you are hereby
>>>> notified that any printing, copying, dissemination, distribution,
>>>> disclosure or forwarding of this communication is strictly prohibited. If
>>>> you have received this communication in error, please contact the sender
>>>> immediately and delete it from your system. Thank You.
>>>>
>>>>
>>>>
>>>> CONFIDENTIALITY NOTICE
>>>> NOTICE: This message is intended for the use of the individual or
>>>> entity to which it is addressed and may contain information that is
>>>> confidential, privileged and exempt from disclosure under applicable law.
>>>> If the reader of this message is not the intended recipient, you are hereby
>>>> notified that any printing, copying, dissemination, distribution,
>>>> disclosure or forwarding of this communication is strictly prohibited. If
>>>> you have received this communication in error, please contact the sender
>>>> immediately and delete it from your system. Thank You.
>>>>
>>>
>>>
>>
>

Reply via email to