1a. ah. yeah i see how it could work, but i wouldn't count on it in a cluster. you would (most likely) run the the sub-job (calculating pi) only on a single node.

1b. different execution environments generally imply different flink programs.

2. sure it does, since it's a normal flink job. yours on the other hand doesn't, since the job calculating PI only runs on a single TaskManager.

3. there are 2 ways. you can either chain jobs like this: (effectively running 2 flink programs in succession)

|publicstaticvoidmain(String[]args)throwsException{doublepi =new classPI().compute();System.out.println("We estimate Pi to be: "+pi); new classThatNeedsPI().computeWhatever(pi); //feeds pi into an env.fromElements call and proceeds from there }|

or (if all building blocks are flink programs) build a single job:

|publicstaticvoidmain(String[]args)throwsException{ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Double> pi =new classPI(env).compute();new classThatNeedsPI(env).computeWhatever(pi); //append your transformations to pi env.execute(); } ... ||publicDataSet<Double>compute()throwsException{return this.env.generateSequence(1,NumIter).map(newSampler()).reduce(newSumReducer()) .map(/*return 4 * x*/);} ... public ? computeWhatever(DataSet<Long> pi) throws Exception { ... } |


On 07.06.2016 13:35, Ser Kho wrote:
Chesnay:
1a. The code actually works, that is the point.
1b. What restrict for a Flink program to have several execution environments?
2. I am not sure that your modification allows for parallelism. Does it?
3. This code is a simple example of writing/organizing large and complicated programs, where the result of this pi needed to be used in another DataSet transformations beyond classPi(). What to do in this case?
Thanks a lot for the suggestions.


On Tuesday, June 7, 2016 6:15 AM, Chesnay Schepler <ches...@apache.org> wrote:


from what i can tell from your code you are trying to execute a job within a job. This just doesn't work.

your main method should look like this:

|publicstaticvoidmain(String[]args)throwsException{doublepi =new classPI().compute();System.out.println("We estimate Pi to be: "+pi);}|



On 06.06.2016 21:14, Ser Kho wrote:
The question is how to encapsulate numerous transformations into one object or may be a function in Apache Flink Java setting. I have tried to investigate this question using an example of Pi calculation (see below). I am wondering whether or not the suggested approach is valid from the Flink's point of view. It works on one computer, however, I do not know how it will behave in a cluster setup. The code is given below, and the main idea behind it as follows:

 1. Create a class, named classPI, which method compute() does all
    data transformations, see more about it below.
 2. In the main method create a DataSet as in *DataSet< classPI > opi
    = env.fromElements(new classPI());*
3.
    Create *DataSet< Double > PI*, which equals output of
    transformation map() that calls the object PI's method compute()
    as in
    *DataSet< Double > PI = opi.map(new MapFunction< classPI ,
    Double>() { public Double map(classPI objPI) { return
    objPI.compute(); }});*
4.
    Now about ClassPI
     *
        Constructor instantiates ExecutionEnvironment, which is local
        for this class, as in
        *public classPI(){ this.NumIter=1000000; env =
        ExecutionEnvironment.getExecutionEnvironment();}*

Thus, the code has two ExecutionEnvironment objects: one in main and another in the class classPI.

 *
    Has method compute() that runs all data transormations (in this
    example it is just several lines but potentially it might contain
    tons of Flink transfromations)
    *public Double compute(){ DataSet count = env.generateSequence(1,
    NumIter) .map(new Sampler()) .reduce(new SumReducer()); PI =
    4.0*count.collect().get(0)/NumIter;
    return PI;}*

the whole code is given below. Again, the question is if this is a valid approach for encapsulation of data transformation into a class in Flink setup that is supposed to be parallelizable to work on a cluster. Is there a better way to hide details of data transformations?
Thanks a lot!

-------------------------The code ----------------------

|publicclassPiEstimation{publicstaticvoidmain(String[]args)throwsException{// this is one ExecutionEnvironmentfinalExecutionEnvironmentenv =ExecutionEnvironment.getExecutionEnvironment();// this is critical DataSet with my classPI that computes PIDataSet<classPI>opi =env.fromElements(newclassPI());// this map calls the method compute() of class classPI that computes PIDataSet<Double>PI =opi.map(newMapFunction<classPI ,Double>(){publicDoublemap(classPI objPI)throwsException{// this is how I call method compute() that calculates PI using transformations returnobjPI.compute();}});doublepi =PI.collect().get(0);System.out.println("We estimate Pi to be: "+pi);}// this class is of no impotance for my question, howerver, it is relevant for pi calculation publicstaticclassSamplerimplementsMapFunction<Long,Long>{@OverridepublicLongmap(Longvalue){doublex =Math.random();doubley =Math.random();return(x *x +y *y)<1?1L:0L;}}// this class is of no impotance for my question, howerver, it is relevant for pi calculation publicstaticfinalclassSumReducerimplementsReduceFunction<Long>{@OverridepublicLongreduce(Longvalue1,Longvalue2){returnvalue1 +value2;}}// this is my class that computes PI, my question is whether such a class is valid in Flink on cluster with parallel computation publicstaticfinalclassclassPI {publicIntegerNumIter;privatefinalExecutionEnvironmentenv;publicDoublePI;// this is constructor with another ExecutionEnvironmentpublicclassPI(){this.NumIter=1000000;env =ExecutionEnvironment.getExecutionEnvironment();}//This is the the method that contains all data transformationpublicDoublecompute()throwsException{DataSet<Long>count =env.generateSequence(1,NumIter).map(newSampler()).reduce(newSumReducer());PI =4.0*count.collect().get(0)/NumIter;returnPI;}}}|




Reply via email to