Could you post a minimal example of your code where the problem is
reproducible? I assume that there has to be another problem because
env.execute should actually trigger the execution.
Cheers,
Till
On Thu, Oct 8, 2015 at 8:58 PM, Florian Heyl <f.h...@gmx.de> wrote:
> Hey Stephan and Pieter,
> That was the same what I thought, so I simply changed the code like this:
>
> original.writeAsCsv(outputPath, "\n", " ", WriteMode.OVERWRITE)
>
> env.execute()
>
> transformPred.writeAsCsv(outputPath2, "\n", " ", WriteMode.OVERWRITE)
>
> env.execute()
>
> But he still not execute the two commands.
> Thank you for your time.
>
> Flo
>
>
> Am 08.10.2015 um 17:41 schrieb Stephan Ewen <se...@apache.org>:
>
> Yes, sinks in Flink are lazy and do not trigger execution automatically.
> We made this choice to allow multiple concurrent sinks (spitting the
> streams and writing to many outputs concurrently). That requires explicit
> execution triggers (env.execute()).
>
> The exceptions are, as mentioned, the "eager" methods "collect()",
> "count()" and "print()". They need to be eager, because the driver program
> needs for example the "count()" value before it can possibly progress...
>
> Stephan
>
>
> On Thu, Oct 8, 2015 at 5:22 PM, Pieter Hameete <phame...@gmail.com> wrote:
>
>> Hi Florian,
>>
>> I believe that when you call *JoinPredictionAndOriginal.collect* the
>> environment will execute your program up until that point. The Csv writes
>> are after this point, so in order to execute these steps I think you would
>> have to call *<env>.execute()* after the Csv writes to trigger the
>> execution (where <env> is the name of the variable pointing to your
>> ExecutionEnvironment).
>>
>> I hope this helps :-)
>>
>> - Pieter
>>
>> 2015-10-08 14:54 GMT+02:00 Florian Heyl <f.h...@gmx.de>:
>>
>>> Hi,
>>> I need some help to figure out why one method of mine in a pipeline
>>> stops the execution on the hdfs.
>>> I am working with the 10.0-SNAPSHOT and the code is the following (see
>>> below). The method stops on the hdfs by calling the collect method (
>>> JoinPredictionAndOriginal.collect) creating a data sink, which is why
>>> the program stops before the two output files at the ends can be created.
>>> What am I missing?
>>> Thank you for your time.
>>>
>>> Best wishes,
>>> Flo
>>>
>>> // method calculates the prediction error
>>> def CalcPredError(predictions: DataSet[LabeledVector], original:
>>> DataSet[LabeledVector],
>>> outputPath: String, outputPath2: String, outputPath3: String):
>>> (DataSet[LabeledVector], Double) ={
>>>
>>> var iter = 0
>>>
>>> val transformPred = predictions
>>> .map { tuple =>
>>> iter = iter + 1
>>> LabeledVector(iter, DenseVector(BigDecimal(tuple.label).setScale(0,
>>> BigDecimal.RoundingMode.HALF_UP).toDouble))
>>> }
>>>
>>> iter = 0
>>>
>>> val tranformOrg = original
>>> .map { tuple =>
>>> iter = iter + 1
>>> LabeledVector(iter, DenseVector(tuple.label))
>>> }
>>>
>>>
>>> val JoinPredictionAndOriginal =
>>> transformPred.join(tranformOrg).where(0).equalTo(0) {
>>> (l, r) => (l.vector.head._2, r.vector.head._2)
>>> }
>>>
>>> val list_JoinPredictionAndOriginal = JoinPredictionAndOriginal.collect
>>>
>>> val N = list_JoinPredictionAndOriginal.length
>>>
>>> val residualSum = list_JoinPredictionAndOriginal.map {
>>> num => pow((num._1 - num._2), 2)
>>> }.sum
>>>
>>> val predictionError = sqrt(residualSum / N)
>>>
>>> original.writeAsCsv(outputPath, "\n", " ", WriteMode.OVERWRITE)
>>> transformPred.writeAsCsv(outputPath2, "\n", " ", WriteMode.OVERWRITE)
>>>
>>> (predictions,predictionError)
>>> }
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>>
>
>