Hi Xiangrui,

Here is the class:


object ALSNew {

 def main (args: Array[String]) {
     val conf = new SparkConf()
      .setAppName("TrainingDataPurchase")
      .set("spark.executor.memory", "4g")
      
      conf.set("spark.shuffle.memoryFraction","0.65") //default is 0.2  
    conf.set("spark.storage.memoryFraction","0.3")//default is 0.6 
    
    
    val sc = new SparkContext(conf) 
     val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._
    
     val pfile = args(0)
     val purchase=sc.textFile(pfile)
   

    val ratings = purchase.map ( line =>        
    line.split(',') match { case Array(user, item, rate) =>
    (user.toInt, item.toInt, rate.toFloat)
    }).toDF()
  
    
        val rank = args(1).toInt
        val numIterations = args(2).toInt
        val regParam : Double = 0.01
        val implicitPrefs : Boolean = true
        val numUserBlocks : Int = 100
        val numItemBlocks : Int = 100
        val nonnegative : Boolean = true
        
        //val paramMap = ParamMap (regParam=0.01)
        //paramMap.put(numUserBlocks=100,  numItemBlocks=100)
   val als = new ALS()
       .setRank(rank)
      .setRegParam(regParam)
      .setImplicitPrefs(implicitPrefs)
      .setNumUserBlocks(numUserBlocks)
      .setNumItemBlocks(numItemBlocks)
      
     
    val alpha = als.getAlpha
  
       
  val model =  als.fit(ratings)
  
  
  val predictions = model.transform(ratings)
      .select("rating", "prediction")
      .map { case Row(rating: Float, prediction: Float) =>
        (rating.toDouble, prediction.toDouble)
      }
    val rmse =
      if (implicitPrefs) {
        // TODO: Use a better (rank-based?) evaluation metric for implicit 
feedback.
        // We limit the ratings and the predictions to interval [0, 1] and 
compute the weighted RMSE
        // with the confidence scores as weights.
        val (totalWeight, weightedSumSq) = predictions.map { case (rating, 
prediction) =>
          val confidence = 1.0 + alpha * math.abs(rating)
          val rating01 = math.max(math.min(rating, 1.0), 0.0)
          val prediction01 = math.max(math.min(prediction, 1.0), 0.0)
          val err = prediction01 - rating01
          (confidence, confidence * err * err)
        }.reduce { case ((c0, e0), (c1, e1)) =>
          (c0 + c1, e0 + e1)
        }
        math.sqrt(weightedSumSq /totalWeight)
      } else {
        val mse = predictions.map { case (rating, prediction) =>
          val err = rating - prediction
          err * err
        }.mean()
        math.sqrt(mse)
      }
    
    println("Mean Squared Error = " + rmse)
 }
 
 
 
 }




I am using the following in my maven build (pom.xml): 


<dependencies>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>2.11.2</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>1.3.0</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-mllib_2.11</artifactId>
        <version>1.3.0</version>
   </dependency>
   <dependency>
   <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>1.3.0</version>
   </dependency>
  </dependencies>


I am using scala version 2.11.2.

Could it be that "spark-1.3.0-bin-hadoop2.4.tgz requires  a different version 
of scala ?

Thanks,
Jay



On Apr 9, 2015, at 4:38 PM, Xiangrui Meng <men...@gmail.com> wrote:

> Could you share ALSNew.scala? Which Scala version did you use? -Xiangrui
> 
> On Wed, Apr 8, 2015 at 4:09 PM, Jay Katukuri <jkatuk...@apple.com> wrote:
>> Hi Xiangrui,
>> 
>> I tried running this on my local machine  (laptop) and got the same error:
>> 
>> Here is what I did:
>> 
>> 1. downloaded spark 1.30 release version (prebuilt for hadoop 2.4 and later)
>> "spark-1.3.0-bin-hadoop2.4.tgz".
>> 2. Ran the following command:
>> 
>> spark-submit --class ALSNew  --master local[8] ALSNew.jar  /input_path
>> 
>> 
>> The stack trace is exactly same.
>> 
>> Thanks,
>> Jay
>> 
>> 
>> 
>> On Apr 8, 2015, at 10:47 AM, Jay Katukuri <jkatuk...@apple.com> wrote:
>> 
>> some additional context:
>> 
>> Since, I am using features of spark 1.3.0, I have downloaded spark 1.3.0 and
>> used spark-submit from there.
>> The cluster is still on spark-1.2.0.
>> 
>> So, this looks to me that at runtime, the executors could not find some
>> libraries of spark-1.3.0, even though I ran spark-submit from my downloaded
>> spark-1.30.
>> 
>> 
>> 
>> On Apr 6, 2015, at 1:37 PM, Jay Katukuri <jkatuk...@apple.com> wrote:
>> 
>> Here is the command that I have used :
>> 
>> spark-submit —class packagename.ALSNew --num-executors 100 --master yarn
>> ALSNew.jar -jar spark-sql_2.11-1.3.0.jar hdfs://input_path
>> 
>> Btw - I could run the old ALS in mllib package.
>> 
>> 
>> 
>> 
>> 
>> On Apr 6, 2015, at 12:32 PM, Xiangrui Meng <men...@gmail.com> wrote:
>> 
>> So ALSNew.scala is your own application, did you add it with
>> spark-submit or spark-shell? The correct command should like
>> 
>> spark-submit --class your.package.name.ALSNew ALSNew.jar [options]
>> 
>> Please check the documentation:
>> http://spark.apache.org/docs/latest/submitting-applications.html
>> 
>> -Xiangrui
>> 
>> On Mon, Apr 6, 2015 at 12:27 PM, Jay Katukuri <jkatuk...@apple.com> wrote:
>> 
>> Hi,
>> 
>> Here is the stack trace:
>> 
>> 
>> Exception in thread "main" java.lang.NoSuchMethodError:
>> scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaUniverse$JavaMirror;
>> at ALSNew$.main(ALSNew.scala:35)
>> at ALSNew.main(ALSNew.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:483)
>> at
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
>> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>> 
>> 
>> Thanks,
>> Jay
>> 
>> 
>> 
>> On Apr 6, 2015, at 12:24 PM, Xiangrui Meng <men...@gmail.com> wrote:
>> 
>> Please attach the full stack trace. -Xiangrui
>> 
>> On Mon, Apr 6, 2015 at 12:06 PM, Jay Katukuri <jkatuk...@apple.com> wrote:
>> 
>> 
>> Hi all,
>> 
>> I got a runtime error while running the ALS.
>> 
>> Exception in thread "main" java.lang.NoSuchMethodError:
>> scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaUniverse$JavaMirror;
>> 
>> 
>> The error that I am getting is at the following code:
>> 
>> val ratings = purchase.map ( line =>
>>  line.split(',') match { case Array(user, item, rate) =>
>>  (user.toInt, item.toInt, rate.toFloat)
>>  }).toDF()
>> 
>> 
>> Any help is appreciated !
>> 
>> I have tried passing the spark-sql jar using the -jar
>> spark-sql_2.11-1.3.0.jar
>> 
>> Thanks,
>> Jay
>> 
>> 
>> 
>> On Mar 17, 2015, at 12:50 PM, Xiangrui Meng <men...@gmail.com> wrote:
>> 
>> Please remember to copy the user list next time. I might not be able
>> to respond quickly. There are many others who can help or who can
>> benefit from the discussion. Thanks! -Xiangrui
>> 
>> On Tue, Mar 17, 2015 at 12:04 PM, Jay Katukuri <jkatuk...@apple.com> wrote:
>> 
>> Great Xiangrui. It works now.
>> 
>> Sorry that I needed to bug you :)
>> 
>> Jay
>> 
>> 
>> On Mar 17, 2015, at 11:48 AM, Xiangrui Meng <men...@gmail.com> wrote:
>> 
>> Please check this section in the user guide:
>> http://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection
>> 
>> You need `import sqlContext.implicits._` to use `toDF()`.
>> 
>> -Xiangrui
>> 
>> On Mon, Mar 16, 2015 at 2:34 PM, Jay Katukuri <jkatuk...@apple.com> wrote:
>> 
>> Hi Xiangrui,
>> Thanks a lot for the quick reply.
>> 
>> I am still facing an issue.
>> 
>> I have tried the code snippet that you have suggested:
>> 
>> val ratings = purchase.map { line =>
>> line.split(',') match { case Array(user, item, rate) =>
>> (user.toInt, item.toInt, rate.toFloat)
>> }.toDF("user", "item", "rate”)}
>> 
>> for this, I got the below error:
>> 
>> error: ';' expected but '.' found.
>> [INFO] }.toDF("user", "item", "rate”)}
>> [INFO]  ^
>> 
>> when I tried below code
>> 
>> val ratings = purchase.map ( line =>
>> line.split(',') match { case Array(user, item, rate) =>
>> (user.toInt, item.toInt, rate.toFloat)
>> }).toDF("user", "item", "rate")
>> 
>> 
>> error: value toDF is not a member of org.apache.spark.rdd.RDD[(Int, Int,
>> Float)]
>> [INFO] possible cause: maybe a semicolon is missing before `value toDF'?
>> [INFO]     }).toDF("user", "item", "rate")
>> 
>> 
>> 
>> I have looked at the document that you have shared and tried the following
>> code:
>> 
>> case class Record(user: Int, item: Int, rate:Double)
>> val ratings = purchase.map(_.split(',')).map(r =>Record(r(0).toInt,
>> r(1).toInt, r(2).toDouble)) .toDF("user", "item", "rate")
>> 
>> for this, I got the below error:
>> 
>> error: value toDF is not a member of org.apache.spark.rdd.RDD[Record]
>> 
>> 
>> Appreciate your help !
>> 
>> Thanks,
>> Jay
>> 
>> 
>> On Mar 16, 2015, at 11:35 AM, Xiangrui Meng <men...@gmail.com> wrote:
>> 
>> Try this:
>> 
>> val ratings = purchase.map { line =>
>> line.split(',') match { case Array(user, item, rate) =>
>> (user.toInt, item.toInt, rate.toFloat)
>> }.toDF("user", "item", "rate")
>> 
>> Doc for DataFrames:
>> http://spark.apache.org/docs/latest/sql-programming-guide.html
>> 
>> -Xiangrui
>> 
>> On Mon, Mar 16, 2015 at 9:08 AM, jaykatukuri <jkatuk...@apple.com> wrote:
>> 
>> Hi all,
>> I am trying to use the new ALS implementation under
>> org.apache.spark.ml.recommendation.ALS.
>> 
>> 
>> 
>> The new method to invoke for training seems to be  override def fit(dataset:
>> DataFrame, paramMap: ParamMap): ALSModel.
>> 
>> How do I create a dataframe object from ratings data set that is on hdfs ?
>> 
>> 
>> where as the method in the old ALS implementation under
>> org.apache.spark.mllib.recommendation.ALS was
>> def train(
>> ratings: RDD[Rating],
>> rank: Int,
>> iterations: Int,
>> lambda: Double,
>> blocks: Int,
>> seed: Long
>> ): MatrixFactorizationModel
>> 
>> My code to run the old ALS train method is as below:
>> 
>> "val sc = new SparkContext(conf)
>> 
>> val pfile = args(0)
>> val purchase=sc.textFile(pfile)
>> val ratings = purchase.map(_.split(',') match { case Array(user, item,
>> rate) =>
>>   Rating(user.toInt, item.toInt, rate.toInt)
>> })
>> 
>> val model = ALS.train(ratings, rank, numIterations, 0.01)"
>> 
>> 
>> Now, for the new ALS fit method, I am trying to use the below code to run,
>> but getting a compilation error:
>> 
>> val als = new ALS()
>>  .setRank(rank)
>> .setRegParam(regParam)
>> .setImplicitPrefs(implicitPrefs)
>> .setNumUserBlocks(numUserBlocks)
>> .setNumItemBlocks(numItemBlocks)
>> 
>> val sc = new SparkContext(conf)
>> 
>> val pfile = args(0)
>> val purchase=sc.textFile(pfile)
>> val ratings = purchase.map(_.split(',') match { case Array(user, item,
>> rate) =>
>>   Rating(user.toInt, item.toInt, rate.toInt)
>> })
>> 
>> val model = als.fit(ratings.toDF())
>> 
>> I get an error that the method toDF() is not a member of
>> org.apache.spark.rdd.RDD[org.apache.spark.ml.recommendation.ALS.Rating[Int]].
>> 
>> Appreciate the help !
>> 
>> Thanks,
>> Jay
>> 
>> 
>> 
>> 
>> 
>> 
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/RDD-to-DataFrame-for-using-ALS-under-org-apache-spark-ml-recommendation-ALS-tp22083.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> 
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 

Reply via email to