"Not sure your purpose. Usually data locality can improve performance" -
Mostly study purpose :)

Thanks for the pointers to the file to be change. Helps very much !

Raajay

On Thu, Sep 10, 2015 at 3:15 AM, Jianfeng (Jeff) Zhang <
[email protected]> wrote:

> >>> I am trying to create a scenario where the mappers (root tasks) are
> necessarily not executed at the data location
> Not sure your purpose. Usually data locality can improve performance.
>
>
> >>> Can the number of tasks for the tokenizer be a value *NOT* equal to
> the number of HDFS blocks of the file ?
> Yes, it can.  Two ways
> *  MRInput internally use InputFormat to determine how to split. So all
> the methods in InputFormat are applied to MRInput too.
>    Like mapreduce.input.fileinputformat.split.minsize &
> mapreduce.input.fileinputformat.split.maxsize
>
> * Another way is to use TezGroupedSplitsInputFormat which is provided by
> tez. This InputFormat will group several splits together as a new split to
> be consumed by one mapper.
>   You can use the following parameters to tune that, and please refer
> MRInputConfigBuilder.groupSplits
>
>    -
>
>    tez.grouping.split-waves
>    -
>
>    tez.grouping.max-size
>    -
>
>    tez.grouping.min-size
>
>
> >>>  Can a mapper be scheduled at a location different than the location
> of its input block ? If yes, how ?
>     Yes, it is possible. Tez will always use the split info, there’s no
> option to disable it. If you really want to, you need to create new
> InputInitializer. I think you just need to make a little changes on
> MRInputAMSplitGenerator
>
> https://github.com/zjffdu/tez/blob/a3a7700dea0a315ad613aa2d8a7223eb73878cb5/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
>
>
> You just need to make a little changes on the following code snippet
>
> InputConfigureVertexTasksEvent configureVertexEvent =
> InputConfigureVertexTasksEvent.create(
>
>         inputSplitInfo.getNumTasks(),
>
>         VertexLocationHint.create(inputSplitInfo.getTaskLocationHints()),
>              // make code changes here
>
>         InputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate());
>
>     events.add(configureVertexEvent);
>
>
>
>
> Best Regard,
> Jeff Zhang
>
>
> From: Raajay <[email protected]>
> Reply-To: "[email protected]" <[email protected]>
> Date: Thursday, September 10, 2015 at 2:07 PM
>
> To: "[email protected]" <[email protected]>
> Subject: Re: Error of setting vertex location hints
>
> The input is a hdfs file. I am trying to create a scenario where the
> mappers (root tasks) are necessarily not executed at the data location. So
> for now, I chose the Location Hint for the tasks in a random fashion. I
> figured by populating VertexLocation hint, with address of random nodes, I
> could achieve it.
>
> This requires setting parallelism to be the number of elements in
> VertexLocation hint; which led to the errors.
>
> Summarizing, for the work count example,
>
> 1. Can the number of tasks for the tokenizer be a value *NOT* equal to the
> number of HDFS blocks of the file ?
>
> 2. Can a mapper be scheduled at a location different than the location of
> its input block ? If yes, how ?
>
> Raajay
>
>
>
>
> On Thu, Sep 10, 2015 at 12:30 AM, Jianfeng (Jeff) Zhang <
> [email protected]> wrote:
>
>> >>> In the WordCount example, while creating the Tokenizer Vertex,
>> neither the parallelism or VertexLocation hints is specified. My guess is
>> that at runtime, based on InputInitializer, these values are populated.
>> Correct, the parallelism and VertexLocation is specified at runtime by
>> InputInitializer
>>
>> >>> What should I do such that location of the tasks for the Tokenizer
>> vertex are not based on HDFS splits but can be arbitrarily configured while
>> creation ?
>> Do you mean your input is not hdfs file ?  In that case I think you need
>> to create your own DataSourceDescriptor. You can refer the
>> DataSourceDescriptor that is used by WordCount example as following.  If
>> possible, let us know more about your context. What kind of data is your
>> input ? And how would you specify the VertexLocation for your input ?
>>
>>     DataSourceDescriptor dataSource = MRInput.createConfigBuilder(new
>> Configuration(tezConf),
>>
>>         TextInputFormat.class,
>> inputPath).groupSplits(!isDisableSplitGrouping()).build();
>>
>>
>>
>> Best Regard,
>> Jeff Zhang
>>
>>
>> From: Raajay <[email protected]>
>> Reply-To: "[email protected]" <[email protected]>
>> Date: Thursday, September 10, 2015 at 1:10 PM
>> To: "[email protected]" <[email protected]>
>> Subject: Re: Error of setting vertex location hints
>>
>> I am just getting started with understanding tez code, so bear with me; I
>> might be wrong here.
>>
>> In the WordCount example, while creating the Tokenizer Vertex, neither
>> the parallelism or VertexLocation hints is specified. My guess is that at
>> runtime, based on InputInitializer, these values are populated.
>>
>> However, I do not want them to be populated at runtime, but rather want
>> them specified while creating the DAG itself. When I do that, I get the
>> exception mentioned in the previous mail.
>>
>> What should I do such that location of the tasks for the Tokenizer vertex
>> are not based on HDFS splits but can be arbitrarily configured while
>> creation ?
>>
>> Raajay
>>
>>
>>
>> On Thu, Sep 10, 2015 at 12:01 AM, Jianfeng (Jeff) Zhang <
>> [email protected]> wrote:
>>
>>>
>>> Actually Tokenizer vertex should already have the VertexLocationHints
>>> from the hdfs file split info at runtime. Did you see any unexpected
>>> behavior ?
>>>
>>>
>>>
>>> Best Regard,
>>> Jeff Zhang
>>>
>>>
>>> From: Raajay <[email protected]>
>>> Reply-To: "[email protected]" <[email protected]>
>>> Date: Thursday, September 10, 2015 at 12:35 PM
>>> To: "[email protected]" <[email protected]>
>>> Subject: Error of setting vertex location hints
>>>
>>> In the WordCount example, I am trying to fix the location of map tasks
>>> by providing "VertexLocationHints" to the "tokenizer" vertex.
>>>
>>> However, the application fails with an exception (stacktrace below). I
>>> guess it is because, the vertex manager expects the parallelism to be -1,
>>> so that it can compute it.
>>>
>>>
>>> What minimal modification to the example would avoid invoking the
>>> VertexManager and allow me use my own customized VertexLocationHint ?
>>>
>>>
>>> Thanks
>>> Raajay
>>>
>>>
>>>
>>> DAG diagnostics: [Vertex failed, vertexName=Tokenizer,
>>> vertexId=vertex_1441839249749_0017_1_00, diagnostics=[Vertex
>>> vertex_1441839249749_0017_1_00 [Tokenizer] killed/failed due
>>> to:AM_USERCODE_FAILURE, Exception in VertexManager,
>>> vertex:vertex_1441839249749_0017_1_00 [Tokenizer],
>>> java.lang.IllegalStateException: Parallelism for the vertex should be set
>>> to -1 if the InputInitializer is setting parallelism, VertexName: Tokenizer
>>>         at
>>> com.google.common.base.Preconditions.checkState(Preconditions.java:145)
>>>         at
>>> org.apache.tez.dag.app.dag.impl.RootInputVertexManager.onRootVertexInitialized(RootInputVertexManager.java:60)
>>>         at
>>> org.apache.tez.dag.app.dag.impl.VertexManager$VertexManagerEventRootInputInitialized.invoke(VertexManager.java:610)
>>>         at
>>> org.apache.tez.dag.app.dag.impl.VertexManager$VertexManagerEvent$1.run(VertexManager.java:631)
>>>         at
>>> org.apache.tez.dag.app.dag.impl.VertexManager$VertexManagerEvent$1.run(VertexManager.java:626)
>>>         at java.security.AccessController.doPrivileged(Native Method)
>>>         at javax.security.auth.Subject.doAs(Subject.java:415)
>>>         at
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
>>>         at
>>> org.apache.tez.dag.app.dag.impl.VertexManager$VertexManagerEvent.call(VertexManager.java:626)
>>>         at
>>> org.apache.tez.dag.app.dag.impl.VertexManager$VertexManagerEvent.call(VertexManager.java:615)
>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>         at java.lang.Thread.run(Thread.java:745)
>>> ], Vertex killed, vertexName=Summation,
>>> vertexId=vertex_1441839249749_0017_1_01, diagnostics=[Vertex received Kill
>>> in INITED state., Vertex vertex_1441839249749_0017_1_01 [Summation]
>>> killed/failed due to:null], DAG did not succeed due to VERTEX_FAILURE.
>>> failedVertices:1 killedVertices:1]
>>> DAG did not succeed
>>>
>>>
>>
>

Reply via email to