Hi,

The model weight is not updating for streaming linear regression.  The code and 
data below is what I am running.

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

val conf = new SparkConf().setMaster("local[1]").setAppName("1feature")
val ssc = new StreamingContext(conf, Seconds(25))
val trainingData = 
ssc.textFileStream("file:///data/TrainStreamDir").map(LabeledPoint.parse)
val testData = 
ssc.textFileStream("file:///data/TestStreamDir").map(LabeledPoint.parse)
val numFeatures = 3
val model = new 
StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(numFeatures))
model.trainOn(trainingData)
model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()
ssc.start()
ssc.awaitTermination()

sample Data in the TrainStreamDir:

(10240,[1,21,0])
(9936,[2,21,15])
(10118,[3,21,30])
(10174,[4,21,45])
(10460,[5,22,0])
(9961,[6,22,15])
(10372,[7,22,30])
(10666,[8,22,45])
(10300,[9,23,0])

Sample of output results:
14/11/10 15:52:55 INFO scheduler.JobScheduler: Added jobs for time 
1415652775000 ms
14/11/10 15:52:55 INFO scheduler.JobScheduler: Starting job streaming job 
1415652775000 ms.0 from job set of time 141565
2775000 ms
14/11/10 15:52:55 INFO spark.SparkContext: Starting job: count at 
GradientDescent.scala:162
14/11/10 15:52:55 INFO spark.SparkContext: Job finished: count at 
GradientDescent.scala:162, took 3.1689E-5 s
14/11/10 15:52:55 INFO optimization.GradientDescent: 
GradientDescent.runMiniBatchSGD returning initial weights, no data
found
14/11/10 15:52:55 INFO regression.StreamingLinearRegressionWithSGD: Model 
updated at time 1415652775000 ms
14/11/10 15:52:55 INFO regression.StreamingLinearRegressionWithSGD: Current 
model: weights, [0.0,0.0,0.0]

Thanks
Tri

Reply via email to