Hi,
I need some help to figure out why one method of mine in a pipeline stops the
execution on the hdfs.
I am working with the 10.0-SNAPSHOT and the code is the following (see below).
The method stops on the hdfs by calling the collect method
(JoinPredictionAndOriginal.collect) creating a data sink, which is why the
program stops before the two output files at the ends can be created. What am I
missing?
Thank you for your time.
Best wishes,
Flo
// method calculates the prediction error
def CalcPredError(predictions: DataSet[LabeledVector], original:
DataSet[LabeledVector],
outputPath: String, outputPath2: String, outputPath3: String):
(DataSet[LabeledVector], Double) ={
var iter = 0
val transformPred = predictions
.map { tuple =>
iter = iter + 1
LabeledVector(iter, DenseVector(BigDecimal(tuple.label).setScale(0,
BigDecimal.RoundingMode.HALF_UP).toDouble))
}
iter = 0
val tranformOrg = original
.map { tuple =>
iter = iter + 1
LabeledVector(iter, DenseVector(tuple.label))
}
val JoinPredictionAndOriginal =
transformPred.join(tranformOrg).where(0).equalTo(0) {
(l, r) => (l.vector.head._2, r.vector.head._2)
}
val list_JoinPredictionAndOriginal = JoinPredictionAndOriginal.collect
val N = list_JoinPredictionAndOriginal.length
val residualSum = list_JoinPredictionAndOriginal.map {
num => pow((num._1 - num._2), 2)
}.sum
val predictionError = sqrt(residualSum / N)
original.writeAsCsv(outputPath, "\n", " ", WriteMode.OVERWRITE)
transformPred.writeAsCsv(outputPath2, "\n", " ", WriteMode.OVERWRITE)
(predictions,predictionError)
}