Hi,

Maybe it is I doing something wrong but I suspect the linear regression is
behaving differently in Spark 1.0 as compared to Spark 0.9.

I have the following data points:

23 9515 7 2.58 113 0.77 0.964 9.5 9
22 9830 8 1.15 126 0.38 0.964 9.5 9
14 10130 9 0.81 129 0.74 0.827 9.6 9
10 10250 11 0.95 87 0.15 0.976 9.7 9
16 10390 12 1.02 78 0.24 0.984 9.7 9
19 10500 12 1.69 81 0.61 0.984 9.7 9.1
13 10575 12 1.56 81 0.73 0.984 9.7 9.2
16.6 10840 13 1.63 67 0.38 0.932 9.8 9.3
15.9 10960 13 1.83 65 0.57 0.878 9.8 9.4
15.7 11060 13 2.03 69 0.72 0.878 9.8 9.5
14 11475 15 1.69 77 0.2 0.887 10.3 9.5
13.5 11775 18 2.31 58 0.12 0.852 11.8 10.1
6.2 11940 21 2.26 67 0.2 0.976 15.3 12.4
9.6 12070 22 2.07 84 0.08 0.993 15.7 13
15.5 12315 22 3.11 69 0.4 1.185 16.6 14.4
31.4 12900 23 2.82 85 0.42 1.15 16.7 15.9
42.7 12975 24 3.48 77 0.17 1.221 16.7 16.1
38.6 13055 24 3.29 75 0.29 1.161 16.8 16.2
43.4 13250 24 2.82 76 0.43 1.161 16.8 16.2
12.5 13795 25 1.6 81 0.56 0.272 16.8 16.2
21.1 14010 26 1.04 75 0.46 0.201 16.8 16.2
19 14455 28 1.76 64 0.16 0.748 16.9 16.2
18.7 14695 28 2 76 0.27 0.819 17.1 16.2
20.2 14905 29 2.35 75 0.33 0.419 17.2 16.4
27.1 15350 30 2.12 85 0.31 1.29 17 16.5
14.8 15740 30 2.35 78 0.81 0.802 17.3 16.5
12.6 16155 32 2.47 80 0.12 0.67 17.9 16.5
14.9 16325 32 3.76 81 0.5 0.532 17.5 16.6
13.8 17060 34 3.76 65 0.91 0.748 17.6 16.6
9 20265 40 3.41 60 0.01 0.512 17.7 16.6

Which I used to perform a RidgeRegressionWithSGD in Spark 0.9 ... I have
decided to upgrade to Spark 1.0 and only modified the RDD[Array[Double]] to
use Vecotors as explained in the Spark docs. However, leaving all other
parameters the same, I do not get the same model weights and overall
prediction. In fact with Spark 1.0 I get weights of NAN, NAN, NAN, NAN....
NAN

Can somebody help? I really need to get it working as it was or at least
explain if there are some other things I need to change in order to get it
working as before. I am writing an entire master thesis based on Spark.

Here is my Spark 1.0 code below and all that was changed was the LabelPoint
made to accept a Vector rather than the Array[Double] as in 0.9.

import scala.math._

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.optimization.SquaredL2Updater
import org.apache.spark.mllib.regression._
import org.apache.spark.rdd.RDD

object TestRegression{

  def main(args: Array[String]) {

    if (args.length == 0) {
      println("Please supply data/file")
      System.exit(1)
    }
    val sc = getSparkContext()
    val wData = sc.textFile(args(1)) //read in supplied file

    val data = wData.map(_.split(" ").map(_.toDouble)).cache() //read one
line at a time tokenizing on spaces
    val trainingSet = data.map {
        row => //read one array at a time
          val transformedData = BYModel(row)
          LabeledPoint(transformedData.head,
Vectors.dense(transformedData.tail)) //prepare predictor and output for
regression
      }.cache()

    println(data.collect)
    println("-------------------------------")
    println(trainingSet.collect.mkString)


    val logAlg = new RidgeRegressionWithSGD()
   
logAlg.optimizer.setNumIterations(100).setRegParam(0.1).setStepSize(0.1)//parameters
to optimize gradient descent algorithm
    val model = logAlg.run(trainingSet)
    println("weights: "+model.weights.toArray.mkString(","))
    println("Mean Squared Error: "+calculateMeanSquaredError(model,
trainingSet))
    calculateROP(model, data)
    sc.stop()
  }


  def getSparkContext(): SparkContext = {
    //get spark context
    //create spark context config
    val conf = new SparkConf().
      setMaster("local").
      setAppName("TestRegression").
      setSparkHome("SPARK_HOME").
      set("spark.executor.memory", "512m").
      set("spark.cleaner.ttl", "3600")
    new SparkContext(conf)

  }

  def BYModel(params: Array[Double]): Array[Double] = {
    import scala.math.log
    val y = log(params(0))
    val x2 = 8000 - params(1)
    val x3 = pow(params(1), 0.69) * (params(8) - 9)
    val x4 = params(1) * (params(8) - params(7))
    val x5 = log((params(3) - 0.02) / (4 - 0.02))
    val x6 = log(params(4) / 60)
    val x7 = params(5) * (-1)
    val x8 = params(6)
    Array(y, x2, x3, x4, x5, x6, x7, x8)

  }

  def calculateROP(model: GeneralizedLinearModel, data: RDD[Array[Double]])
{
    import scala.math.log
    
    val constants = model.weights.toArray 
    val a1 = model.intercept
    val a2 = constants(0)
    val a3 = constants(1)
    val a4 = constants(2)
    val a5 = constants(3)
    val a6 = constants(4)
    val a7 = constants(5)
    val a8 = constants(6)

    val rop = data.map {
      row =>

        exp(a1 + (a2 * (8000 - row(1))) 
          + (a3 * (pow(row(1), 0.69) * (row(8) - 9))) 
          + (a4 * (row(1) * (row(8) - row(7)))) 
          + a5 * log((row(3) - 0.02) / (4 - 0.02)) 
          + a6 * log(row(4) / 60) 
          + a7 * (row(5) * (-1)) 
          + a8 * row(6)) 

    }
    //rop.saveAsTextFile("file:///c:/tools/data/rop_calc.txt") // write file
for plotting
  }

  def calculateMeanSquaredError(model: GeneralizedLinearModel, data:
RDD[LabeledPoint]) = {
    val valueAndPreds = data.map { point =>
      val prediction = model.predict(point.features)
      (point.label, prediction)
    }
    valueAndPreds.map{case(v, p) => math.pow((v-p), 2)}.mean()

  }


}


Best regards,

Osas



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/LinearRegression-giving-different-weights-in-Spark-1-0-and-Spark-0-9-tp8187.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to