Could you post a minimal example of your code where the problem is reproducible? I assume that there has to be another problem because env.execute should actually trigger the execution.
Cheers, Till On Thu, Oct 8, 2015 at 8:58 PM, Florian Heyl <f.h...@gmx.de> wrote: > Hey Stephan and Pieter, > That was the same what I thought, so I simply changed the code like this: > > original.writeAsCsv(outputPath, "\n", " ", WriteMode.OVERWRITE) > > env.execute() > > transformPred.writeAsCsv(outputPath2, "\n", " ", WriteMode.OVERWRITE) > > env.execute() > > But he still not execute the two commands. > Thank you for your time. > > Flo > > > Am 08.10.2015 um 17:41 schrieb Stephan Ewen <se...@apache.org>: > > Yes, sinks in Flink are lazy and do not trigger execution automatically. > We made this choice to allow multiple concurrent sinks (spitting the > streams and writing to many outputs concurrently). That requires explicit > execution triggers (env.execute()). > > The exceptions are, as mentioned, the "eager" methods "collect()", > "count()" and "print()". They need to be eager, because the driver program > needs for example the "count()" value before it can possibly progress... > > Stephan > > > On Thu, Oct 8, 2015 at 5:22 PM, Pieter Hameete <phame...@gmail.com> wrote: > >> Hi Florian, >> >> I believe that when you call *JoinPredictionAndOriginal.collect* the >> environment will execute your program up until that point. The Csv writes >> are after this point, so in order to execute these steps I think you would >> have to call *<env>.execute()* after the Csv writes to trigger the >> execution (where <env> is the name of the variable pointing to your >> ExecutionEnvironment). >> >> I hope this helps :-) >> >> - Pieter >> >> 2015-10-08 14:54 GMT+02:00 Florian Heyl <f.h...@gmx.de>: >> >>> Hi, >>> I need some help to figure out why one method of mine in a pipeline >>> stops the execution on the hdfs. >>> I am working with the 10.0-SNAPSHOT and the code is the following (see >>> below). The method stops on the hdfs by calling the collect method ( >>> JoinPredictionAndOriginal.collect) creating a data sink, which is why >>> the program stops before the two output files at the ends can be created. >>> What am I missing? >>> Thank you for your time. >>> >>> Best wishes, >>> Flo >>> >>> // method calculates the prediction error >>> def CalcPredError(predictions: DataSet[LabeledVector], original: >>> DataSet[LabeledVector], >>> outputPath: String, outputPath2: String, outputPath3: String): >>> (DataSet[LabeledVector], Double) ={ >>> >>> var iter = 0 >>> >>> val transformPred = predictions >>> .map { tuple => >>> iter = iter + 1 >>> LabeledVector(iter, DenseVector(BigDecimal(tuple.label).setScale(0, >>> BigDecimal.RoundingMode.HALF_UP).toDouble)) >>> } >>> >>> iter = 0 >>> >>> val tranformOrg = original >>> .map { tuple => >>> iter = iter + 1 >>> LabeledVector(iter, DenseVector(tuple.label)) >>> } >>> >>> >>> val JoinPredictionAndOriginal = >>> transformPred.join(tranformOrg).where(0).equalTo(0) { >>> (l, r) => (l.vector.head._2, r.vector.head._2) >>> } >>> >>> val list_JoinPredictionAndOriginal = JoinPredictionAndOriginal.collect >>> >>> val N = list_JoinPredictionAndOriginal.length >>> >>> val residualSum = list_JoinPredictionAndOriginal.map { >>> num => pow((num._1 - num._2), 2) >>> }.sum >>> >>> val predictionError = sqrt(residualSum / N) >>> >>> original.writeAsCsv(outputPath, "\n", " ", WriteMode.OVERWRITE) >>> transformPred.writeAsCsv(outputPath2, "\n", " ", WriteMode.OVERWRITE) >>> >>> (predictions,predictionError) >>> } >>> >>> >>> >>> >>> >>> >>> >> >> > >