Please help me get started on Apache Spark
Friends, I am pretty new to Spark as much as to Scala, MLib and the entire Hadoop stack!! It would be so much help if I could be pointed to some good books on Spark and MLib? Further, does MLib support any algorithms for B2B cross sell/ upsell or customer retention (out of the box preferably) that I could run on my Sales force data? I am currently using Collaborative filtering but that's essentially B2C. Thanks in advance!! Regards, Saurabh Agrawal This e-mail, including accompanying communications and attachments, is strictly confidential and only for the intended recipient. Any retention, use or disclosure not expressly authorised by Markit is prohibited. This email is subject to all waivers and other terms at the following link: http://www.markit.com/en/about/legal/email-disclaimer.page Please visit http://www.markit.com/en/about/contact/contact-us.page? for contact information on our offices worldwide. MarkitSERV Limited has its registered office located at Level 4, Ropemaker Place, 25 Ropemaker Street, London, EC2Y 9LY and is authorized and regulated by the Financial Conduct Authority with registration number 207294
Writing collection to file error
import org.apache.spark.mllib.recommendation.ALS import org.apache.spark.mllib.recommendation.Rating // Load and parse the data val data = sc.textFile(/path/CFReady.txt) val ratings = data.map(_.split('\t') match { case Array(user, item, rate) = Rating(user.toInt, item.toInt, rate.toDouble) }) // Build the recommendation model using ALS val rank = 50 val numIterations = 100 val model = ALS.train(ratings, rank, numIterations, 0.10) // Evaluate the model on rating data val usersProducts = ratings.map { case Rating(user, product, rate) = (user, product) } val predictions = model.predict(usersProducts).map { case Rating(user, product, rate) = ((user, product), rate) } val ratesAndPreds = ratings.map { case Rating(user, product, rate) = ((user, product), rate) }.join(predictions) val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) = val err = (r1 - r2) err * err }.mean() println(Mean Squared Error = + MSE) val pw = new PrintWriter(new File(/path/CFOutput.txt)) ratesAndPreds.foreach(pw.println) } Hi, Consider the highlighted code, I am trying to write the output of ratesAndPreds array on to the disk. But I get error Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.foreach(RDD.scala:758) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:36) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:41) at $iwC$$iwC$$iwC$$iwC.init(console:43) at $iwC$$iwC$$iwC.init(console:45) at $iwC$$iwC.init(console:47) at $iwC.init(console:49) at init(console:51) at .init(console:55) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: java.io.PrintWriter at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at
RE: Writing collection to file error
Thanks for your help Akhil, however, this is creating an output folder and storing the result sets in multiple files. Also the record count in the result set seems to have multiplied!! Is there any other way to achieve this? Thanks!! Regards, Saurabh Agrawal Vice President Markit Green Boulevard B-9A, Tower C 3rd Floor, Sector - 62, Noida 201301, India +91 120 611 8274 Office From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Monday, November 24, 2014 5:55 PM To: Saurabh Agrawal Cc: user@spark.apache.org Subject: Re: Writing collection to file error Hi Saurabh, Here your ratesAndPreds is a RDD of type [((int, int), (Double, Double))] not an Array. Now, if you want to save it on disk, then you can simply call the saveAsTextFile and provide the location. So change your last line from this: ratesAndPreds.foreach(pw.println) to this ratesAndPreds.saveAsTextFile(/path/CFOutput) Thanks Best Regards On Mon, Nov 24, 2014 at 5:05 PM, Saurabh Agrawal saurabh.agra...@markit.commailto:saurabh.agra...@markit.com wrote: import org.apache.spark.mllib.recommendation.ALS import org.apache.spark.mllib.recommendation.Rating // Load and parse the data val data = sc.textFile(/path/CFReady.txt) val ratings = data.map(_.split('\t') match { case Array(user, item, rate) = Rating(user.toInt, item.toInt, rate.toDouble) }) // Build the recommendation model using ALS val rank = 50 val numIterations = 100 val model = ALS.train(ratings, rank, numIterations, 0.10) // Evaluate the model on rating data val usersProducts = ratings.map { case Rating(user, product, rate) = (user, product) } val predictions = model.predict(usersProducts).map { case Rating(user, product, rate) = ((user, product), rate) } val ratesAndPreds = ratings.map { case Rating(user, product, rate) = ((user, product), rate) }.join(predictions) val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) = val err = (r1 - r2) err * err }.mean() println(Mean Squared Error = + MSE) val pw = new PrintWriter(new File(/path/CFOutput.txt)) ratesAndPreds.foreach(pw.println) } Hi, Consider the highlighted code, I am trying to write the output of ratesAndPreds array on to the disk. But I get error Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.foreach(RDD.scala:758) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:36) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:41) at $iwC$$iwC$$iwC$$iwC.init(console:43) at $iwC$$iwC$$iwC.init(console:45) at $iwC$$iwC.init(console:47) at $iwC.init(console:49) at init(console:51) at .init(console:55) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902
ALS train error
Hi, I am getting the following error val model = ALS.train(ratings, rank, numIterations, 0.01) org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 103.0 failed 1 times, most recent failure: Lost task 1.0 in stage 103.0 (TID 3, localhost): scala.MatchError: [Ljava.lang.String;@4837e797 (of class [Ljava.lang.String;) $iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:16) $iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:16) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Thanks!! Regards, Saurabh Agrawal This e-mail, including accompanying communications and attachments, is strictly confidential and only for the intended recipient. Any retention, use or disclosure not expressly authorised by Markit is prohibited. This email is subject to all waivers and other terms at the following link: http://www.markit.com/en/about/legal/email-disclaimer.page Please visit http://www.markit.com/en/about/contact/contact-us.page? for contact information on our offices worldwide. MarkitSERV Limited has its registered office located at Level 4, Ropemaker Place, 25 Ropemaker Street, London, EC2Y 9LY and is authorized and regulated by the Financial Conduct Authority with registration number 207294
Building Desktop application for ALS-MlLib/ Training ALS
Hi, I am a new bee in spark and scala world I have been trying to implement Collaborative filtering using MlLib supplied out of the box with Spark and Scala I have 2 problems 1. The best model was trained with rank = 20 and lambda = 5.0, and numIter = 10, and its RMSE on the test set is 25.718710831912485. The best model improves the baseline by 18.29%. Is there a scientific way in which RMSE could be brought down? What is a descent acceptable value for RMSE? 2. I picked up the Collaborative filtering algorithm from http://ampcamp.berkeley.edu/5/exercises/movie-recommendation-with-mllib.html and executed the given code with my dataset. Now, I want to build a desktop application around it. a. What is the best language to do this Java/ Scala? Any possibility to do this using C#? b. Can somebody please share any relevant documents/ source or any helper links to help me get started on this? Your help is greatly appreciated Thanks!! Regards, Saurabh Agrawal This e-mail, including accompanying communications and attachments, is strictly confidential and only for the intended recipient. Any retention, use or disclosure not expressly authorised by Markit is prohibited. This email is subject to all waivers and other terms at the following link: http://www.markit.com/en/about/legal/email-disclaimer.page Please visit http://www.markit.com/en/about/contact/contact-us.page? for contact information on our offices worldwide. MarkitSERV Limited has its registered office located at Level 4, Ropemaker Place, 25 Ropemaker Street, London, EC2Y 9LY and is authorized and regulated by the Financial Conduct Authority with registration number 207294
Building Desktop application for ALS-MlLib/ Training ALS
Hi, I am a new bee in spark and scala world I have been trying to implement Collaborative filtering using MlLib supplied out of the box with Spark and Scala I have 2 problems 1. The best model was trained with rank = 20 and lambda = 5.0, and numIter = 10, and its RMSE on the test set is 25.718710831912485. The best model improves the baseline by 18.29%. Is there a scientific way in which RMSE could be brought down? What is a descent acceptable value for RMSE? 2. I picked up the Collaborative filtering algorithm from http://ampcamp.berkeley.edu/5/exercises/movie-recommendation-with-mllib.html and executed the given code with my dataset. Now, I want to build a desktop application around it. a. What is the best language to do this Java/ Scala? Any possibility to do this using C#? b. Can somebody please share any relevant documents/ source or any helper links to help me get started on this? Your help is greatly appreciated Thanks!! Regards, Saurabh Agrawal This e-mail, including accompanying communications and attachments, is strictly confidential and only for the intended recipient. Any retention, use or disclosure not expressly authorised by Markit is prohibited. This email is subject to all waivers and other terms at the following link: http://www.markit.com/en/about/legal/email-disclaimer.page Please visit http://www.markit.com/en/about/contact/contact-us.page? for contact information on our offices worldwide. MarkitSERV Limited has its registered office located at Level 4, Ropemaker Place, 25 Ropemaker Street, London, EC2Y 9LY and is authorized and regulated by the Financial Conduct Authority with registration number 207294
Calling ALS-MlLib from desktop application/ Training ALS
Requesting guidance on my queries in trail email. -Original Message- From: Saurabh Agrawal Sent: Saturday, December 13, 2014 07:06 PM GMT Standard Time To: user@spark.apache.org Subject: Building Desktop application for ALS-MlLib/ Training ALS Hi, I am a new bee in spark and scala world I have been trying to implement Collaborative filtering using MlLib supplied out of the box with Spark and Scala I have 2 problems 1. The best model was trained with rank = 20 and lambda = 5.0, and numIter = 10, and its RMSE on the test set is 25.718710831912485. The best model improves the baseline by 18.29%. Is there a scientific way in which RMSE could be brought down? What is a descent acceptable value for RMSE? 2. I picked up the Collaborative filtering algorithm from http://ampcamp.berkeley.edu/5/exercises/movie-recommendation-with-mllib.html and executed the given code with my dataset. Now, I want to build a desktop application around it. a. What is the best language to do this Java/ Scala? Any possibility to do this using C#? b. Can somebody please share any relevant documents/ source or any helper links to help me get started on this? Your help is greatly appreciated Thanks!! Regards, Saurabh Agrawal This e-mail, including accompanying communications and attachments, is strictly confidential and only for the intended recipient. Any retention, use or disclosure not expressly authorised by Markit is prohibited. This email is subject to all waivers and other terms at the following link: http://www.markit.com/en/about/legal/email-disclaimer.page Please visit http://www.markit.com/en/about/contact/contact-us.page? for contact information on our offices worldwide. MarkitSERV Limited has its registered office located at Level 4, Ropemaker Place, 25 Ropemaker Street, London, EC2Y 9LY and is authorized and regulated by the Financial Conduct Authority with registration number 207294
Spark Trainings/ Professional certifications
Hi, Can you please suggest some of the best available trainings/ coaching and professional certifications in Apache Spark? We are trying to run predictive analysis on our Sales data and come out with recommendations (leads). We have tried to run CF but we end up getting absolutely bogus results!! A training that would leave us hands on to do our job effectively is what we are after. In addition to this, if this training could provide a firm ground for a professional certification, that would be an added advantage. Thanks for your inputs Regards, Saurabh Agrawal This e-mail, including accompanying communications and attachments, is strictly confidential and only for the intended recipient. Any retention, use or disclosure not expressly authorised by Markit is prohibited. This email is subject to all waivers and other terms at the following link: http://www.markit.com/en/about/legal/email-disclaimer.page Please visit http://www.markit.com/en/about/contact/contact-us.page? for contact information on our offices worldwide. MarkitSERV Limited has its registered office located at Level 4, Ropemaker Place, 25 Ropemaker Street, London, EC2Y 9LY and is authorized and regulated by the Financial Conduct Authority with registration number 207294
RE: how to read lz4 compressed data using fileStream of spark streaming?
How do I unsubscribe from this mailing list please? Thanks!! Regards, Saurabh Agrawal Vice President Markit Green Boulevard B-9A, Tower C 3rd Floor, Sector - 62, Noida 201301, India +91 120 611 8274 Office This e-mail, including accompanying communications and attachments, is strictly confidential and only for the intended recipient. Any retention, use or disclosure not expressly authorised by Markit is prohibited. This email is subject to all waivers and other terms at the following link: http://www.markit.com/en/about/legal/email-disclaimer.page Please visit http://www.markit.com/en/about/contact/contact-us.page for contact information on our offices worldwide.
Re: Scala/Python or Java
Greetings, Even I am a beginner and currently learning Spark. I found Python + Spark combination to be easiest to learn given my past experience with Python, but yes, it depends on the user. Here is some reference documentation: https://spark.apache.org/docs/latest/programming-guide.html Regards, -Saurabh *www.rideondata.com http://www.rideondata.com* On Thu, Jun 25, 2015 at 1:06 PM, Ted Yu yuzhih...@gmail.com wrote: The answer depends on the user's experience with these languages as well as the most commonly used language in the production environment. Learning Scala requires some time. If you're very comfortable with Java / Python, you can go with that while at the same time familiarizing yourself with Scala. Cheers On Thu, Jun 25, 2015 at 12:04 PM, spark user spark_u...@yahoo.com.invalid wrote: Hi All , I am new for spark , i just want to know which technology is good/best for spark learning ? 1) Scala 2) Java 3) Python I know spark support all 3 languages , but which one is best ? Thanks su -- Regards, Saurabh S. Agrawal Memoir http://saurabhska.wordpress.com/
[Streaming (DStream) ] : Does Spark Streaming supports pause/resume consumption of message from Kafka?
Hi Spark Team, I am using Spark 3.4.0 version in my application which is use to consume messages from Kafka topics. I have below queries: 1. Does DStream support pause/resume streaming message consumption at runtime on particular condition? If yes, please provide details. 2. I tried to revoke partition from consumer at runtime which cause error. throw new IllegalStateException(s"Previously tracked partitions " + s"${revokedPartitions.mkString("[", ",", "]")} been revoked by Kafka because of consumer " + s"rebalance. This is mostly due to another stream with same group id joined, " + s"please check if there're different streaming application misconfigure to use same " + s"group id. Fundamentally different stream should use different group id") 3. Does Spark support Blue/Green Deployment. I need to implement Blue/Green Deployment scenario with Spark. Facing problem as need to deploy both Blue and Green deployment with same consumer-group-id. As I read, spark does not support 2 deployment with same consumer group-id, this implementation is failing. Please guide how this can be implemented with Spark. 4. Does Spark support Active-Active deployment. It will be great if you can reply on above queries please. -- Regards, Saurabh Agrawal Software Development Specialist, IPaaS R [A picture containing logoDescription automatically generated] This email and the information contained herein is proprietary and confidential and subject to the Amdocs Email Terms of Service, which you may review at https://www.amdocs.com/about/email-terms-of-service <https://www.amdocs.com/about/email-terms-of-service>