"Not sure your purpose. Usually data locality can improve performance" - Mostly study purpose :)
Thanks for the pointers to the file to be change. Helps very much ! Raajay On Thu, Sep 10, 2015 at 3:15 AM, Jianfeng (Jeff) Zhang < [email protected]> wrote: > >>> I am trying to create a scenario where the mappers (root tasks) are > necessarily not executed at the data location > Not sure your purpose. Usually data locality can improve performance. > > > >>> Can the number of tasks for the tokenizer be a value *NOT* equal to > the number of HDFS blocks of the file ? > Yes, it can. Two ways > * MRInput internally use InputFormat to determine how to split. So all > the methods in InputFormat are applied to MRInput too. > Like mapreduce.input.fileinputformat.split.minsize & > mapreduce.input.fileinputformat.split.maxsize > > * Another way is to use TezGroupedSplitsInputFormat which is provided by > tez. This InputFormat will group several splits together as a new split to > be consumed by one mapper. > You can use the following parameters to tune that, and please refer > MRInputConfigBuilder.groupSplits > > - > > tez.grouping.split-waves > - > > tez.grouping.max-size > - > > tez.grouping.min-size > > > >>> Can a mapper be scheduled at a location different than the location > of its input block ? If yes, how ? > Yes, it is possible. Tez will always use the split info, there’s no > option to disable it. If you really want to, you need to create new > InputInitializer. I think you just need to make a little changes on > MRInputAMSplitGenerator > > https://github.com/zjffdu/tez/blob/a3a7700dea0a315ad613aa2d8a7223eb73878cb5/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java > > > You just need to make a little changes on the following code snippet > > InputConfigureVertexTasksEvent configureVertexEvent = > InputConfigureVertexTasksEvent.create( > > inputSplitInfo.getNumTasks(), > > VertexLocationHint.create(inputSplitInfo.getTaskLocationHints()), > // make code changes here > > InputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate()); > > events.add(configureVertexEvent); > > > > > Best Regard, > Jeff Zhang > > > From: Raajay <[email protected]> > Reply-To: "[email protected]" <[email protected]> > Date: Thursday, September 10, 2015 at 2:07 PM > > To: "[email protected]" <[email protected]> > Subject: Re: Error of setting vertex location hints > > The input is a hdfs file. I am trying to create a scenario where the > mappers (root tasks) are necessarily not executed at the data location. So > for now, I chose the Location Hint for the tasks in a random fashion. I > figured by populating VertexLocation hint, with address of random nodes, I > could achieve it. > > This requires setting parallelism to be the number of elements in > VertexLocation hint; which led to the errors. > > Summarizing, for the work count example, > > 1. Can the number of tasks for the tokenizer be a value *NOT* equal to the > number of HDFS blocks of the file ? > > 2. Can a mapper be scheduled at a location different than the location of > its input block ? If yes, how ? > > Raajay > > > > > On Thu, Sep 10, 2015 at 12:30 AM, Jianfeng (Jeff) Zhang < > [email protected]> wrote: > >> >>> In the WordCount example, while creating the Tokenizer Vertex, >> neither the parallelism or VertexLocation hints is specified. My guess is >> that at runtime, based on InputInitializer, these values are populated. >> Correct, the parallelism and VertexLocation is specified at runtime by >> InputInitializer >> >> >>> What should I do such that location of the tasks for the Tokenizer >> vertex are not based on HDFS splits but can be arbitrarily configured while >> creation ? >> Do you mean your input is not hdfs file ? In that case I think you need >> to create your own DataSourceDescriptor. You can refer the >> DataSourceDescriptor that is used by WordCount example as following. If >> possible, let us know more about your context. What kind of data is your >> input ? And how would you specify the VertexLocation for your input ? >> >> DataSourceDescriptor dataSource = MRInput.createConfigBuilder(new >> Configuration(tezConf), >> >> TextInputFormat.class, >> inputPath).groupSplits(!isDisableSplitGrouping()).build(); >> >> >> >> Best Regard, >> Jeff Zhang >> >> >> From: Raajay <[email protected]> >> Reply-To: "[email protected]" <[email protected]> >> Date: Thursday, September 10, 2015 at 1:10 PM >> To: "[email protected]" <[email protected]> >> Subject: Re: Error of setting vertex location hints >> >> I am just getting started with understanding tez code, so bear with me; I >> might be wrong here. >> >> In the WordCount example, while creating the Tokenizer Vertex, neither >> the parallelism or VertexLocation hints is specified. My guess is that at >> runtime, based on InputInitializer, these values are populated. >> >> However, I do not want them to be populated at runtime, but rather want >> them specified while creating the DAG itself. When I do that, I get the >> exception mentioned in the previous mail. >> >> What should I do such that location of the tasks for the Tokenizer vertex >> are not based on HDFS splits but can be arbitrarily configured while >> creation ? >> >> Raajay >> >> >> >> On Thu, Sep 10, 2015 at 12:01 AM, Jianfeng (Jeff) Zhang < >> [email protected]> wrote: >> >>> >>> Actually Tokenizer vertex should already have the VertexLocationHints >>> from the hdfs file split info at runtime. Did you see any unexpected >>> behavior ? >>> >>> >>> >>> Best Regard, >>> Jeff Zhang >>> >>> >>> From: Raajay <[email protected]> >>> Reply-To: "[email protected]" <[email protected]> >>> Date: Thursday, September 10, 2015 at 12:35 PM >>> To: "[email protected]" <[email protected]> >>> Subject: Error of setting vertex location hints >>> >>> In the WordCount example, I am trying to fix the location of map tasks >>> by providing "VertexLocationHints" to the "tokenizer" vertex. >>> >>> However, the application fails with an exception (stacktrace below). I >>> guess it is because, the vertex manager expects the parallelism to be -1, >>> so that it can compute it. >>> >>> >>> What minimal modification to the example would avoid invoking the >>> VertexManager and allow me use my own customized VertexLocationHint ? >>> >>> >>> Thanks >>> Raajay >>> >>> >>> >>> DAG diagnostics: [Vertex failed, vertexName=Tokenizer, >>> vertexId=vertex_1441839249749_0017_1_00, diagnostics=[Vertex >>> vertex_1441839249749_0017_1_00 [Tokenizer] killed/failed due >>> to:AM_USERCODE_FAILURE, Exception in VertexManager, >>> vertex:vertex_1441839249749_0017_1_00 [Tokenizer], >>> java.lang.IllegalStateException: Parallelism for the vertex should be set >>> to -1 if the InputInitializer is setting parallelism, VertexName: Tokenizer >>> at >>> com.google.common.base.Preconditions.checkState(Preconditions.java:145) >>> at >>> org.apache.tez.dag.app.dag.impl.RootInputVertexManager.onRootVertexInitialized(RootInputVertexManager.java:60) >>> at >>> org.apache.tez.dag.app.dag.impl.VertexManager$VertexManagerEventRootInputInitialized.invoke(VertexManager.java:610) >>> at >>> org.apache.tez.dag.app.dag.impl.VertexManager$VertexManagerEvent$1.run(VertexManager.java:631) >>> at >>> org.apache.tez.dag.app.dag.impl.VertexManager$VertexManagerEvent$1.run(VertexManager.java:626) >>> at java.security.AccessController.doPrivileged(Native Method) >>> at javax.security.auth.Subject.doAs(Subject.java:415) >>> at >>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) >>> at >>> org.apache.tez.dag.app.dag.impl.VertexManager$VertexManagerEvent.call(VertexManager.java:626) >>> at >>> org.apache.tez.dag.app.dag.impl.VertexManager$VertexManagerEvent.call(VertexManager.java:615) >>> at java.util.concurrent.FutureTask.run(FutureTask.java:262) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>> at java.lang.Thread.run(Thread.java:745) >>> ], Vertex killed, vertexName=Summation, >>> vertexId=vertex_1441839249749_0017_1_01, diagnostics=[Vertex received Kill >>> in INITED state., Vertex vertex_1441839249749_0017_1_01 [Summation] >>> killed/failed due to:null], DAG did not succeed due to VERTEX_FAILURE. >>> failedVertices:1 killedVertices:1] >>> DAG did not succeed >>> >>> >> >
