Hi
I try to understand example provided in
https://spark.apache.org/docs/1.2.1/mllib-linear-methods.html -
Streaming linear regression
Code:
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
object StreamingLinReg {
def main(args: Array[String]) {
val conf = new
SparkConf().setAppName("StreamLinReg").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(10))
val trainingData =
ssc.textFileStream("/Users/margusja/Documents/workspace/sparcdemo/training/").map(LabeledPoint.parse).cache()
val testData =
ssc.textFileStream("/Users/margusja/Documents/workspace/sparcdemo/testing/").map(LabeledPoint.parse)
val numFeatures = 3
val model = new
StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(numFeatures))
model.trainOn(trainingData)
model.predictOnValues(testData.map(lp => (lp.label,
lp.features))).print()
ssc.start()
ssc.awaitTermination()
}
}
Compiled code and run it
Put file contains
(1.0,[2.0,2.0,2.0])
(2.0,[3.0,3.0,3.0])
(3.0,[4.0,4.0,4.0])
(4.0,[5.0,5.0,5.0])
(5.0,[6.0,6.0,6.0])
(6.0,[7.0,7.0,7.0])
(7.0,[8.0,8.0,8.0])
(8.0,[9.0,9.0,9.0])
(9.0,[10.0,10.0,10.0])
in to training directory.
I can see that models weight change:
15/03/14 08:53:40 INFO StreamingLinearRegressionWithSGD: Current model:
weights, [7.333333333333333,7.333333333333333,7.333333333333333]
No I can put what ever in to testing directory but I can not understand
answer.
In example I can put the same file I used for training in to testing
directory. File content is
(1.0,[2.0,2.0,2.0])
(2.0,[3.0,3.0,3.0])
(3.0,[4.0,4.0,4.0])
(4.0,[5.0,5.0,5.0])
(5.0,[6.0,6.0,6.0])
(6.0,[7.0,7.0,7.0])
(7.0,[8.0,8.0,8.0])
(8.0,[9.0,9.0,9.0])
(9.0,[10.0,10.0,10.0])
And answer will be
(1.0,0.0)
(2.0,0.0)
(3.0,0.0)
(4.0,0.0)
(5.0,0.0)
(6.0,0.0)
(7.0,0.0)
(8.0,0.0)
(9.0,0.0)
And in case my file content is
(0.0,[2.0,2.0,2.0])
(0.0,[3.0,3.0,3.0])
(0.0,[4.0,4.0,4.0])
(0.0,[5.0,5.0,5.0])
(0.0,[6.0,6.0,6.0])
(0.0,[7.0,7.0,7.0])
(0.0,[8.0,8.0,8.0])
(0.0,[9.0,9.0,9.0])
(0.0,[10.0,10.0,10.0])
the answer will be:
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
I except to get label predicted by model.
--
Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
+372 51 480