>>> I am trying to create a scenario where the mappers (root tasks) are
>>> necessarily not executed at the data location
Not sure your purpose. Usually data locality can improve performance.
>>> Can the number of tasks for the tokenizer be a value *NOT* equal to the
>>> number of HDFS blocks of the file ?
Yes, it can. Two ways
* MRInput internally use InputFormat to determine how to split. So all the
methods in InputFormat are applied to MRInput too.
Like mapreduce.input.fileinputformat.split.minsize &
mapreduce.input.fileinputformat.split.maxsize
* Another way is to use TezGroupedSplitsInputFormat which is provided by tez.
This InputFormat will group several splits together as a new split to be
consumed by one mapper.
You can use the following parameters to tune that, and please refer
MRInputConfigBuilder.groupSplits
*
tez.grouping.split-waves
*
tez.grouping.max-size
*
tez.grouping.min-size
>>> Can a mapper be scheduled at a location different than the location of its
>>> input block ? If yes, how ?
Yes, it is possible. Tez will always use the split info, there’s no option
to disable it. If you really want to, you need to create new InputInitializer.
I think you just need to make a little changes on MRInputAMSplitGenerator
https://github.com/zjffdu/tez/blob/a3a7700dea0a315ad613aa2d8a7223eb73878cb5/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
You just need to make a little changes on the following code snippet
InputConfigureVertexTasksEvent configureVertexEvent =
InputConfigureVertexTasksEvent.create(
inputSplitInfo.getNumTasks(),
VertexLocationHint.create(inputSplitInfo.getTaskLocationHints()),
// make code changes here
InputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate());
events.add(configureVertexEvent);
Best Regard,
Jeff Zhang
From: Raajay <[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Date: Thursday, September 10, 2015 at 2:07 PM
To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Subject: Re: Error of setting vertex location hints
The input is a hdfs file. I am trying to create a scenario where the mappers
(root tasks) are necessarily not executed at the data location. So for now, I
chose the Location Hint for the tasks in a random fashion. I figured by
populating VertexLocation hint, with address of random nodes, I could achieve
it.
This requires setting parallelism to be the number of elements in
VertexLocation hint; which led to the errors.
Summarizing, for the work count example,
1. Can the number of tasks for the tokenizer be a value *NOT* equal to the
number of HDFS blocks of the file ?
2. Can a mapper be scheduled at a location different than the location of its
input block ? If yes, how ?
Raajay
On Thu, Sep 10, 2015 at 12:30 AM, Jianfeng (Jeff) Zhang
<[email protected]<mailto:[email protected]>> wrote:
>>> In the WordCount example, while creating the Tokenizer Vertex, neither the
>>> parallelism or VertexLocation hints is specified. My guess is that at
>>> runtime, based on InputInitializer, these values are populated.
Correct, the parallelism and VertexLocation is specified at runtime by
InputInitializer
>>> What should I do such that location of the tasks for the Tokenizer vertex
>>> are not based on HDFS splits but can be arbitrarily configured while
>>> creation ?
Do you mean your input is not hdfs file ? In that case I think you need to
create your own DataSourceDescriptor. You can refer the DataSourceDescriptor
that is used by WordCount example as following. If possible, let us know more
about your context. What kind of data is your input ? And how would you specify
the VertexLocation for your input ?
DataSourceDescriptor dataSource = MRInput.createConfigBuilder(new
Configuration(tezConf),
TextInputFormat.class,
inputPath).groupSplits(!isDisableSplitGrouping()).build();
Best Regard,
Jeff Zhang
From: Raajay <[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Date: Thursday, September 10, 2015 at 1:10 PM
To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Subject: Re: Error of setting vertex location hints
I am just getting started with understanding tez code, so bear with me; I might
be wrong here.
In the WordCount example, while creating the Tokenizer Vertex, neither the
parallelism or VertexLocation hints is specified. My guess is that at runtime,
based on InputInitializer, these values are populated.
However, I do not want them to be populated at runtime, but rather want them
specified while creating the DAG itself. When I do that, I get the exception
mentioned in the previous mail.
What should I do such that location of the tasks for the Tokenizer vertex are
not based on HDFS splits but can be arbitrarily configured while creation ?
Raajay
On Thu, Sep 10, 2015 at 12:01 AM, Jianfeng (Jeff) Zhang
<[email protected]<mailto:[email protected]>> wrote:
Actually Tokenizer vertex should already have the VertexLocationHints from the
hdfs file split info at runtime. Did you see any unexpected behavior ?
Best Regard,
Jeff Zhang
From: Raajay <[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Date: Thursday, September 10, 2015 at 12:35 PM
To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Subject: Error of setting vertex location hints
In the WordCount example, I am trying to fix the location of map tasks by
providing "VertexLocationHints" to the "tokenizer" vertex.
However, the application fails with an exception (stacktrace below). I guess it
is because, the vertex manager expects the parallelism to be -1, so that it can
compute it.
What minimal modification to the example would avoid invoking the VertexManager
and allow me use my own customized VertexLocationHint ?
Thanks
Raajay
DAG diagnostics: [Vertex failed, vertexName=Tokenizer,
vertexId=vertex_1441839249749_0017_1_00, diagnostics=[Vertex
vertex_1441839249749_0017_1_00 [Tokenizer] killed/failed due
to:AM_USERCODE_FAILURE, Exception in VertexManager,
vertex:vertex_1441839249749_0017_1_00 [Tokenizer],
java.lang.IllegalStateException: Parallelism for the vertex should be set to -1
if the InputInitializer is setting parallelism, VertexName: Tokenizer
at
com.google.common.base.Preconditions.checkState(Preconditions.java:145)
at
org.apache.tez.dag.app.dag.impl.RootInputVertexManager.onRootVertexInitialized(RootInputVertexManager.java:60)
at
org.apache.tez.dag.app.dag.impl.VertexManager$VertexManagerEventRootInputInitialized.invoke(VertexManager.java:610)
at
org.apache.tez.dag.app.dag.impl.VertexManager$VertexManagerEvent$1.run(VertexManager.java:631)
at
org.apache.tez.dag.app.dag.impl.VertexManager$VertexManagerEvent$1.run(VertexManager.java:626)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at
org.apache.tez.dag.app.dag.impl.VertexManager$VertexManagerEvent.call(VertexManager.java:626)
at
org.apache.tez.dag.app.dag.impl.VertexManager$VertexManagerEvent.call(VertexManager.java:615)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
], Vertex killed, vertexName=Summation,
vertexId=vertex_1441839249749_0017_1_01, diagnostics=[Vertex received Kill in
INITED state., Vertex vertex_1441839249749_0017_1_01 [Summation] killed/failed
due to:null], DAG did not succeed due to VERTEX_FAILURE. failedVertices:1
killedVertices:1]
DAG did not succeed