Spark on Kubernetes
Respected Sir/Madam, I am Tarunraghav. I have a query regarding spark on kubernetes. We have an eks cluster, within which we have spark installed in the pods. We set the executor memory as 1GB and set the executor instances as 2, I have also set dynamic allocation as true. So when I try to read a 3 GB CSV file or parquet file, it is supposed to increase the number of pods by 2. But the number of executor pods is zero. I don't know why executor pods aren't being created, even though I set executor instance as 2. Please suggest a solution for this. Thanks & Regards, Tarunraghav
Re: newbie question about RDD
Sorry I forgot to ask how can I use spark context here ? I have hdfs directory path of the files, as well as the name node of hdfs cluster. Thanks for your help. On Mon, Nov 21, 2016 at 9:45 PM, Raghav wrote: > Hi > > I am extremely new to Spark. I have to read a file form HDFS, and get it > in memory in RDD format. > > I have a Java class as follows: > > class Person { > private long UUID; > private String FirstName; > private String LastName; > private String zip; > >// public methods > } > > The file in HDFS is as follows: > > UUID. FirstName LastName Zip > 7462 John Doll06903 > 5231 Brad Finley 32820 > > > Can someone point me how to get a JavaRDD object by reading the > file in HDFS ? > > Thanks. > > -- > Raghav > -- Raghav
newbie question about RDD
Hi I am extremely new to Spark. I have to read a file form HDFS, and get it in memory in RDD format. I have a Java class as follows: class Person { private long UUID; private String FirstName; private String LastName; private String zip; // public methods } The file in HDFS is as follows: UUID. FirstName LastName Zip 7462 John Doll06903 5231 Brad Finley 32820 Can someone point me how to get a JavaRDD object by reading the file in HDFS ? Thanks. -- Raghav
Kafka Producer within a docker Instance
Hi I run a spark job, where the executor is within a docker instance. I want to push the spark job output (one by one) to a Kafka broker which is outside the docker instance. Has anyone tried anything like this where Kafka producer is within a docker and broker is outside ? I am a newbie to both Spark and Kafka, and looking for some pointers to start exploring. Thanks. -- Raghav
Re: Newbie question - Best way to bootstrap with Spark
Thanks a ton, guys. On Sun, Nov 6, 2016 at 4:57 PM, raghav wrote: > I am newbie in the world of big data analytics, and I want to teach myself > Apache Spark, and want to be able to write scripts to tinker with data. > > I have some understanding of Map Reduce but have not had a chance to get my > hands dirty. There are tons of resources for Spark, but I am looking for > some guidance for starter material, or videos. > > Thanks. > > Raghav > > > > -- > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/Newbie-question-Best-way-to- > bootstrap-with-Spark-tp28032.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- Raghav
Re: Newbie question - Best way to bootstrap with Spark
Can you please point out the right courses from EDX/Berkeley ? Many thanks. On Sun, Nov 6, 2016 at 6:08 PM, ayan guha wrote: > I would start with Spark documentation, really. Then you would probably > start with some older videos from youtube, especially spark summit > 2014,2015 and 2016 videos. Regading practice, I would strongly suggest > Databricks cloud (or download prebuilt from spark site). You can also take > courses from EDX/Berkley, which are very good starter courses. > > On Mon, Nov 7, 2016 at 11:57 AM, raghav wrote: > >> I am newbie in the world of big data analytics, and I want to teach myself >> Apache Spark, and want to be able to write scripts to tinker with data. >> >> I have some understanding of Map Reduce but have not had a chance to get >> my >> hands dirty. There are tons of resources for Spark, but I am looking for >> some guidance for starter material, or videos. >> >> Thanks. >> >> Raghav >> >> >> >> -- >> View this message in context: http://apache-spark-user-list. >> 1001560.n3.nabble.com/Newbie-question-Best-way-to-bootstrap- >> with-Spark-tp28032.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >> > > > -- > Best Regards, > Ayan Guha >
Newbie question - Best way to bootstrap with Spark
I am newbie in the world of big data analytics, and I want to teach myself Apache Spark, and want to be able to write scripts to tinker with data. I have some understanding of Map Reduce but have not had a chance to get my hands dirty. There are tons of resources for Spark, but I am looking for some guidance for starter material, or videos. Thanks. Raghav -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Newbie-question-Best-way-to-bootstrap-with-Spark-tp28032.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Submitting Spark Applications using Spark Submit
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Also, in the above error it says: connection refused to ec2-XXX.compute-1.amazonaws.com/10.165.103.16:7077 <http://ec2-xxx.compute-1.amazonaws.com/10.165.103.16:7077> I don’t understand where it gets the 10.165.103.16 <http://ec2-xxx.compute-1.amazonaws.com/10.165.103.16:7077> from. I never specify that in the master url command line parameter. Any ideas on what I might be doing wrong? > On Jun 19, 2015, at 7:19 PM, Andrew Or wrote: > > Hi Raghav, > > I'm assuming you're using standalone mode. When using the Spark EC2 scripts > you need to make sure that every machine has the most updated jars. Once you > have built on one of the nodes, you must rsync the Spark directory to the > rest of the nodes (see /root/spark-ec2/copy-dir). > > That said, I usually build it locally on my laptop and scp the assembly jar > to the cluster instead of building it there. The EC2 machines often take much > longer to build for some reason. Also it's cumbersome to set up proper IDE > there. > > -Andrew > > > 2015-06-19 19:11 GMT-07:00 Raghav Shankar : > Thanks Andrew! Is this all I have to do when using the spark ec2 script to > setup a spark cluster? It seems to be getting an assembly jar that is not > from my project(perhaps from a maven repo). Is there a way to make the ec2 > script use the assembly jar that I created? > > Thanks, > Raghav > > > On Friday, June 19, 2015, Andrew Or wrote: > Hi Raghav, > > If you want to make changes to Spark and run your application with it, you > may follow these steps. > > 1. git clone g...@github.com:apache/spark > 2. cd spark; build/mvn clean package -DskipTests [...] > 3. make local changes > 4. build/mvn package -DskipTests [...] (no need to clean again here) > 5. bin/spark-submit --master spark://[...] --class your.main.class your.jar > > No need to pass in extra --driver-java-options or --driver-extra-classpath as > others have suggested. When using spark-submit, the main jar comes from > assembly/target/scala_2.10, which is prepared through "mvn package". You just > have to make sure that you re-package the assembly jar after each > modification. > > -Andrew > > 2015-06-18 16:35 GMT-07:00 maxdml : > You can specify the jars of your application to be included with spark-submit > with the /--jars/ switch. > > Otherwise, are you sure that your newly compiled spark jar assembly is in > assembly/target/scala-2.10/? > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-Applications-using-Spark-Submit-tp23352p23400.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > >
Re: Submitting Spark Applications using Spark Submit
Thanks Andrew! Is this all I have to do when using the spark ec2 script to setup a spark cluster? It seems to be getting an assembly jar that is not from my project(perhaps from a maven repo). Is there a way to make the ec2 script use the assembly jar that I created? Thanks, Raghav On Friday, June 19, 2015, Andrew Or wrote: > Hi Raghav, > > If you want to make changes to Spark and run your application with it, you > may follow these steps. > > 1. git clone g...@github.com:apache/spark > 2. cd spark; build/mvn clean package -DskipTests [...] > 3. make local changes > 4. build/mvn package -DskipTests [...] (no need to clean again here) > 5. bin/spark-submit --master spark://[...] --class your.main.class your.jar > > No need to pass in extra --driver-java-options or --driver-extra-classpath > as others have suggested. When using spark-submit, the main jar comes from > assembly/target/scala_2.10, which is prepared through "mvn package". You > just have to make sure that you re-package the assembly jar after each > modification. > > -Andrew > > 2015-06-18 16:35 GMT-07:00 maxdml >: > >> You can specify the jars of your application to be included with >> spark-submit >> with the /--jars/ switch. >> >> Otherwise, are you sure that your newly compiled spark jar assembly is in >> assembly/target/scala-2.10/? >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-Applications-using-Spark-Submit-tp23352p23400.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >> >
Re: Implementing top() using treeReduce()
So, I would add the assembly jar to the just the master or would I have to add it to all the slaves/workers too? Thanks, Raghav > On Jun 17, 2015, at 5:13 PM, DB Tsai wrote: > > You need to build the spark assembly with your modification and deploy > into cluster. > > Sincerely, > > DB Tsai > -- > Blog: https://www.dbtsai.com > PGP Key ID: 0xAF08DF8D > > > On Wed, Jun 17, 2015 at 5:11 PM, Raghav Shankar > wrote: >> I’ve implemented this in the suggested manner. When I build Spark and attach >> the new spark-core jar to my eclipse project, I am able to use the new >> method. In order to conduct the experiments I need to launch my app on a >> cluster. I am using EC2. When I setup my master and slaves using the EC2 >> setup scripts, it sets up spark, but I think my custom built spark-core jar >> is not being used. How do it up on EC2 so that my custom version of >> Spark-core is used? >> >> Thanks, >> Raghav >> >> On Jun 9, 2015, at 7:41 PM, DB Tsai wrote: >> >> Having the following code in RDD.scala works for me. PS, in the following >> code, I merge the smaller queue into larger one. I wonder if this will help >> performance. Let me know when you do the benchmark. >> >> def treeTakeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = >> withScope { >> if (num == 0) { >>Array.empty >> } else { >>val mapRDDs = mapPartitions { items => >> // Priority keeps the largest elements, so let's reverse the ordering. >> val queue = new BoundedPriorityQueue[T](num)(ord.reverse) >> queue ++= util.collection.Utils.takeOrdered(items, num)(ord) >> Iterator.single(queue) >>} >>if (mapRDDs.partitions.length == 0) { >> Array.empty >>} else { >> mapRDDs.treeReduce { (queue1, queue2) => >>if (queue1.size > queue2.size) { >> queue1 ++= queue2 >> queue1 >>} else { >> queue2 ++= queue1 >> queue2 >>} >> }.toArray.sorted(ord) >>} >> } >> } >> >> def treeTop(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope { >> treeTakeOrdered(num)(ord.reverse) >> } >> >> >> >> Sincerely, >> >> DB Tsai >> -- >> Blog: https://www.dbtsai.com >> PGP Key ID: 0xAF08DF8D >> >> On Tue, Jun 9, 2015 at 10:09 AM, raggy wrote: >>> >>> I am trying to implement top-k in scala within apache spark. I am aware >>> that >>> spark has a top action. But, top() uses reduce(). Instead, I would like to >>> use treeReduce(). I am trying to compare the performance of reduce() and >>> treeReduce(). >>> >>> The main issue I have is that I cannot use these 2 lines of code which are >>> used in the top() action within my Spark application. >>> >>> val queue = new BoundedPriorityQueue[T](num)(ord.reverse) >>> queue ++= util.collection.Utils.takeOrdered(items, num)(ord) >>> >>> How can I go about implementing top() using treeReduce()? >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/Implementing-top-using-treeReduce-tp23227.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> - >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >> >> - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Implementing top() using treeReduce()
I’ve implemented this in the suggested manner. When I build Spark and attach the new spark-core jar to my eclipse project, I am able to use the new method. In order to conduct the experiments I need to launch my app on a cluster. I am using EC2. When I setup my master and slaves using the EC2 setup scripts, it sets up spark, but I think my custom built spark-core jar is not being used. How do it up on EC2 so that my custom version of Spark-core is used? Thanks, Raghav > On Jun 9, 2015, at 7:41 PM, DB Tsai wrote: > > Having the following code in RDD.scala works for me. PS, in the following > code, I merge the smaller queue into larger one. I wonder if this will help > performance. Let me know when you do the benchmark. > def treeTakeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = > withScope { > if (num == 0) { > Array.empty > } else { > val mapRDDs = mapPartitions { items => > // Priority keeps the largest elements, so let's reverse the ordering. > val queue = new BoundedPriorityQueue[T](num)(ord.reverse) > queue ++= util.collection.Utils.takeOrdered(items, num)(ord) > Iterator.single(queue) > } > if (mapRDDs.partitions.length == 0) { > Array.empty > } else { > mapRDDs.treeReduce { (queue1, queue2) => > if (queue1.size > queue2.size) { > queue1 ++= queue2 > queue1 > } else { > queue2 ++= queue1 > queue2 > } > }.toArray.sorted(ord) > } > } > } > > def treeTop(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope { > treeTakeOrdered(num)(ord.reverse) > } > > > Sincerely, > > DB Tsai > -- > Blog: https://www.dbtsai.com <https://www.dbtsai.com/> > PGP Key ID: 0xAF08DF8D > <https://pgp.mit.edu/pks/lookup?search=0x59DF55B8AF08DF8D> > > On Tue, Jun 9, 2015 at 10:09 AM, raggy <mailto:raghav0110...@gmail.com>> wrote: > I am trying to implement top-k in scala within apache spark. I am aware that > spark has a top action. But, top() uses reduce(). Instead, I would like to > use treeReduce(). I am trying to compare the performance of reduce() and > treeReduce(). > > The main issue I have is that I cannot use these 2 lines of code which are > used in the top() action within my Spark application. > > val queue = new BoundedPriorityQueue[T](num)(ord.reverse) > queue ++= util.collection.Utils.takeOrdered(items, num)(ord) > > How can I go about implementing top() using treeReduce()? > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Implementing-top-using-treeReduce-tp23227.html > > <http://apache-spark-user-list.1001560.n3.nabble.com/Implementing-top-using-treeReduce-tp23227.html> > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > <mailto:user-unsubscr...@spark.apache.org> > For additional commands, e-mail: user-h...@spark.apache.org > <mailto:user-h...@spark.apache.org> > >
Re: Submitting Spark Applications using Spark Submit
To clarify, I am using the spark standalone cluster. On Tuesday, June 16, 2015, Yanbo Liang wrote: > If you run Spark on YARN, the simplest way is replace the > $SPARK_HOME/lib/spark-.jar with your own version spark jar file and run > your application. > The spark-submit script will upload this jar to YARN cluster automatically > and then you can run your application as usual. > It does not care about which version of Spark in your YARN cluster. > > 2015-06-17 10:42 GMT+08:00 Raghav Shankar >: > >> The documentation says spark.driver.userClassPathFirst can only be used >> in cluster mode. Does this mean I have to set the --deploy-mode option >> for spark-submit to cluster? Or can I still use the default client? My >> understanding is that even in the default deploy mode, spark still uses >> the slave machines I have on ec2. >> >> Also, the spark.driver.extraLibraryPath property mentions that I can >> provide a path for special libraries on the spark-submit command line >> options. Do my jar files in this path have to be the same name as the jar >> used by spark, or is it intelligent enough to identify that two jars are >> supposed to be the same thing? If they are supposed to be the same name, >> how can I find out the name I should use for my jar? Eg: If I just name my >> modified spark-core jar as spark.jar and put in a lib folder and provide >> the path of the folder to spark-submit would that be enough to tell Spark >> to use that spark-core jar instead of the default? >> >> Thanks, >> Raghav >> >> On Jun 16, 2015, at 7:19 PM, Will Briggs > > wrote: >> >> If this is research-only, and you don't want to have to worry about >> updating the jars installed by default on the cluster, you can add your >> custom Spark jar using the "spark.driver.extraLibraryPath" configuration >> property when running spark-submit, and then use the experimental " >> spark.driver.userClassPathFirst" config to force it to use yours. >> >> See here for more details and options: >> https://spark.apache.org/docs/1.4.0/configuration.html >> >> On June 16, 2015, at 10:12 PM, Raghav Shankar > > wrote: >> >> I made the change so that I could implement top() using treeReduce(). A >> member on here suggested I make the change in RDD.scala to accomplish that. >> Also, this is for a research project, and not for commercial use. >> >> So, any advice on how I can get the spark submit to use my custom built >> jars would be very useful. >> >> Thanks, >> Raghav >> >> On Jun 16, 2015, at 6:57 PM, Will Briggs > > wrote: >> >> In general, you should avoid making direct changes to the Spark source >> code. If you are using Scala, you can seamlessly blend your own methods on >> top of the base RDDs using implicit conversions. >> >> Regards, >> Will >> >> On June 16, 2015, at 7:53 PM, raggy > > wrote: >> >> I am trying to submit a spark application using the command line. I used >> the >> spark submit command for doing so. I initially setup my Spark application >> on >> Eclipse and have been making changes on there. I recently obtained my own >> version of the Spark source code and added a new method to RDD.scala. I >> created a new spark core jar using mvn, and I added it to my eclipse build >> path. My application ran perfectly fine. >> >> Now, I would like to submit it through the command line. I submitted my >> application like this: >> >> bin/spark-submit --master local[2] --class "SimpleApp" >> /Users/XXX/Desktop/spark2.jar >> >> The spark-submit command is within the spark project that I modified by >> adding new methods. >> When I do so, I get this error: >> >> java.lang.NoSuchMethodError: >> org.apache.spark.rdd.RDD.treeTop(ILscala/math/Ordering;)Ljava/lang/Object; >> at SimpleApp$.main(SimpleApp.scala:12) >> at SimpleApp.main(SimpleApp.scala) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) >> at >> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) >> at java.lang.reflect.Method.invoke(Method.java:597) >> at >> >> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) >> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) >> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala
Re: Submitting Spark Applications using Spark Submit
The documentation says spark.driver.userClassPathFirst can only be used in cluster mode. Does this mean I have to set the --deploy-mode option for spark-submit to cluster? Or can I still use the default client? My understanding is that even in the default deploy mode, spark still uses the slave machines I have on ec2. Also, the spark.driver.extraLibraryPath property mentions that I can provide a path for special libraries on the spark-submit command line options. Do my jar files in this path have to be the same name as the jar used by spark, or is it intelligent enough to identify that two jars are supposed to be the same thing? If they are supposed to be the same name, how can I find out the name I should use for my jar? Eg: If I just name my modified spark-core jar as spark.jar and put in a lib folder and provide the path of the folder to spark-submit would that be enough to tell Spark to use that spark-core jar instead of the default? Thanks, Raghav > On Jun 16, 2015, at 7:19 PM, Will Briggs wrote: > > If this is research-only, and you don't want to have to worry about updating > the jars installed by default on the cluster, you can add your custom Spark > jar using the "spark.driver.extraLibraryPath" configuration property when > running spark-submit, and then use the experimental " > spark.driver.userClassPathFirst" config to force it to use yours. > > See here for more details and options: > https://spark.apache.org/docs/1.4.0/configuration.html > > On June 16, 2015, at 10:12 PM, Raghav Shankar wrote: > > I made the change so that I could implement top() using treeReduce(). A > member on here suggested I make the change in RDD.scala to accomplish that. > Also, this is for a research project, and not for commercial use. > > So, any advice on how I can get the spark submit to use my custom built jars > would be very useful. > > Thanks, > Raghav > >> On Jun 16, 2015, at 6:57 PM, Will Briggs wrote: >> >> In general, you should avoid making direct changes to the Spark source code. >> If you are using Scala, you can seamlessly blend your own methods on top of >> the base RDDs using implicit conversions. >> >> Regards, >> Will >> >> On June 16, 2015, at 7:53 PM, raggy wrote: >> >> I am trying to submit a spark application using the command line. I used the >> spark submit command for doing so. I initially setup my Spark application on >> Eclipse and have been making changes on there. I recently obtained my own >> version of the Spark source code and added a new method to RDD.scala. I >> created a new spark core jar using mvn, and I added it to my eclipse build >> path. My application ran perfectly fine. >> >> Now, I would like to submit it through the command line. I submitted my >> application like this: >> >> bin/spark-submit --master local[2] --class "SimpleApp" >> /Users/XXX/Desktop/spark2.jar >> >> The spark-submit command is within the spark project that I modified by >> adding new methods. >> When I do so, I get this error: >> >> java.lang.NoSuchMethodError: >> org.apache.spark.rdd.RDD.treeTop(ILscala/math/Ordering;)Ljava/lang/Object; >> at SimpleApp$.main(SimpleApp.scala:12) >> at SimpleApp.main(SimpleApp.scala) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) >> at java.lang.reflect.Method.invoke(Method.java:597) >> at >> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) >> at >> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) >> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) >> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) >> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) >> >> When I use spark submit, where does the jar come from? How do I make sure it >> uses the jars that have built? >> >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-Applications-using-Spark-Submit-tp23352.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >
Re: Submitting Spark Applications using Spark Submit
I made the change so that I could implement top() using treeReduce(). A member on here suggested I make the change in RDD.scala to accomplish that. Also, this is for a research project, and not for commercial use. So, any advice on how I can get the spark submit to use my custom built jars would be very useful. Thanks, Raghav > On Jun 16, 2015, at 6:57 PM, Will Briggs wrote: > > In general, you should avoid making direct changes to the Spark source code. > If you are using Scala, you can seamlessly blend your own methods on top of > the base RDDs using implicit conversions. > > Regards, > Will > > On June 16, 2015, at 7:53 PM, raggy wrote: > > I am trying to submit a spark application using the command line. I used the > spark submit command for doing so. I initially setup my Spark application on > Eclipse and have been making changes on there. I recently obtained my own > version of the Spark source code and added a new method to RDD.scala. I > created a new spark core jar using mvn, and I added it to my eclipse build > path. My application ran perfectly fine. > > Now, I would like to submit it through the command line. I submitted my > application like this: > > bin/spark-submit --master local[2] --class "SimpleApp" > /Users/XXX/Desktop/spark2.jar > > The spark-submit command is within the spark project that I modified by > adding new methods. > When I do so, I get this error: > > java.lang.NoSuchMethodError: > org.apache.spark.rdd.RDD.treeTop(ILscala/math/Ordering;)Ljava/lang/Object; > at SimpleApp$.main(SimpleApp.scala:12) > at SimpleApp.main(SimpleApp.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > at java.lang.reflect.Method.invoke(Method.java:597) > at > org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) > at > org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) > at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > > When I use spark submit, where does the jar come from? How do I make sure it > uses the jars that have built? > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-Applications-using-Spark-Submit-tp23352.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Different Sorting RDD methods in Apache Spark
Thank you for you responses! You mention that it only works as long as the data fits on a single machine. What I am tying to do is receive the sorted contents of my dataset. For this to be possible, the entire dataset should be able to fit on a single machine. Are you saying that sorting the entire data and collecting it on the driver node is not a typical use case? If I want to do this using sortBy(), I would first call sortBy() followed by a collect(). Collect() would involve gathering all the data on a single machine as well. Thanks, Raghav On Tuesday, June 9, 2015, Mark Hamstra wrote: > Correct. Trading away scalability for increased performance is not an > option for the standard Spark API. > > On Tue, Jun 9, 2015 at 3:05 AM, Daniel Darabos < > daniel.dara...@lynxanalytics.com > > wrote: > >> It would be even faster to load the data on the driver and sort it there >> without using Spark :). Using reduce() is cheating, because it only works >> as long as the data fits on one machine. That is not the targeted use case >> of a distributed computation system. You can repeat your test with more >> data (that doesn't fit on one machine) to see what I mean. >> >> On Tue, Jun 9, 2015 at 8:30 AM, raggy > > wrote: >> >>> For a research project, I tried sorting the elements in an RDD. I did >>> this in >>> two different approaches. >>> >>> In the first method, I applied a mapPartitions() function on the RDD, so >>> that it would sort the contents of the RDD, and provide a result RDD that >>> contains the sorted list as the only record in the RDD. Then, I applied a >>> reduce function which basically merges sorted lists. >>> >>> I ran these experiments on an EC2 cluster containing 30 nodes. I set it >>> up >>> using the spark ec2 script. The data file was stored in HDFS. >>> >>> In the second approach I used the sortBy method in Spark. >>> >>> I performed these operation on the US census data(100MB) found here >>> >>> A single lines looks like this >>> >>> 9, Not in universe, 0, 0, Children, 0, Not in universe, Never married, >>> Not >>> in universe or children, Not in universe, White, All other, Female, Not >>> in >>> universe, Not in universe, Children or Armed Forces, 0, 0, 0, Nonfiler, >>> Not >>> in universe, Not in universe, Child <18 never marr not in subfamily, >>> Child >>> under 18 never married, 1758.14, Nonmover, Nonmover, Nonmover, Yes, Not >>> in >>> universe, 0, Both parents present, United-States, United-States, >>> United-States, Native- Born in the United States, 0, Not in universe, 0, >>> 0, >>> 94, - 5. >>> I sorted based on the 25th value in the CSV. In this line that is >>> 1758.14. >>> >>> I noticed that sortBy performs worse than the other method. Is this the >>> expected scenario? If it is, why wouldn't the mapPartitions() and >>> reduce() >>> be the default sorting approach? >>> >>> Here is my implementation >>> >>> public static void sortBy(JavaSparkContext sc){ >>> JavaRDD rdd = sc.textFile("/data.txt",32); >>> long start = System.currentTimeMillis(); >>> rdd.sortBy(new Function(){ >>> >>> @Override >>> public Double call(String v1) throws Exception { >>> // TODO Auto-generated method stub >>> String [] arr = v1.split(","); >>> return Double.parseDouble(arr[24]); >>> } >>> }, true, 9).collect(); >>> long end = System.currentTimeMillis(); >>> System.out.println("SortBy: " + (end - start)); >>> } >>> >>> public static void sortList(JavaSparkContext sc){ >>> JavaRDD rdd = sc.textFile("/data.txt",32); >>> //parallelize(l, >>> 8); >>> long start = System.currentTimeMillis(); >>> JavaRDD>> rdd3 = >>> rdd.mapPartitions(new FlatMapFunction, >>> LinkedList>>(){ >>> >>> @Override >>> public Iterable>> >>> call(Iterator t) >>> throws Exception { >>> // TODO Auto-generated method stub >>> LinkedList> lines = new >>> LinkedList>(); >>> while(t.hasNext()){ >>> String s = t.next(); >>> String arr1[] = s.split(","); >>> Tuple2 t1 = new Tuple2>> String>(Double.parseDouble(arr1[24]),s); >>> lines.add(t1); >>> } >>> Collections.sort(lines, new IncomeComparator()); >>> LinkedList>> list = new >>> LinkedList>>(); >>> list.add(lines); >>> return list; >>> } >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/Different-Sorting-RDD-methods-in-Apache-Spark-tp23214.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> - >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >>> >> >
Re: TreeReduce Functionality in Spark
Hey DB, Thanks for the reply! I still don't think this answers my question. For example, if I have a top() action being executed and I have 32 workers(32 partitions), and I choose a depth of 4, what does the overlay of intermediate reducers look like? How many reducers are there excluding the master and the worker? How many partitions get sent to each of these intermediate reducers? Does this number vary at each level? Thanks! On Thursday, June 4, 2015, DB Tsai wrote: > By default, the depth of the tree is 2. Each partition will be one node. > > Sincerely, > > DB Tsai > --- > Blog: https://www.dbtsai.com > > > On Thu, Jun 4, 2015 at 10:46 AM, Raghav Shankar > wrote: > > Hey Reza, > > > > Thanks for your response! > > > > Your response clarifies some of my initial thoughts. However, what I > don't > > understand is how the depth of the tree is used to identify how many > > intermediate reducers there will be, and how many partitions are sent to > the > > intermediate reducers. Could you provide some insight into this? > > > > Thanks, > > Raghav > > > > On Thursday, June 4, 2015, Reza Zadeh > wrote: > >> > >> In a regular reduce, all partitions have to send their reduced value to > a > >> single machine, and that machine can become a bottleneck. > >> > >> In a treeReduce, the partitions talk to each other in a logarithmic > number > >> of rounds. Imagine a binary tree that has all the partitions at its > leaves > >> and the root will contain the final reduced value. This way there is no > >> single bottleneck machine. > >> > >> It remains to decide the number of children each node should have and > how > >> deep the tree should be, which is some of the logic in the method you > >> pasted. > >> > >> On Wed, Jun 3, 2015 at 7:10 PM, raggy > wrote: > >>> > >>> I am trying to understand what the treeReduce function for an RDD does, > >>> and > >>> how it is different from the normal reduce function. My current > >>> understanding is that treeReduce tries to split up the reduce into > >>> multiple > >>> steps. We do a partial reduce on different nodes, and then a final > reduce > >>> is > >>> done to get the final result. Is this correct? If so, I guess what I am > >>> curious about is, how does spark decide how many nodes will be on each > >>> level, and how many partitions will be sent to a given node? > >>> > >>> The bulk of the implementation is within this function: > >>> > >>> partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth) > >>> .getOrElse(throw new UnsupportedOperationException("empty > >>> collection")) > >>> > >>> The above function is expanded to > >>> > >>> val cleanSeqOp = context.clean(seqOp) > >>> val cleanCombOp = context.clean(combOp) > >>> val aggregatePartition = > >>> (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, > >>> cleanCombOp) > >>> var partiallyAggregated = mapPartitions(it => > >>> Iterator(aggregatePartition(it))) > >>> var numPartitions = partiallyAggregated.partitions.length > >>> val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / > >>> depth)).toInt, 2) > >>> // If creating an extra level doesn't help reduce > >>> // the wall-clock time, we stop tree aggregation. > >>> while (numPartitions > scale + numPartitions / scale) { > >>> numPartitions /= scale > >>> val curNumPartitions = numPartitions > >>> partiallyAggregated = > partiallyAggregated.mapPartitionsWithIndex > >>> { > >>> (i, iter) => iter.map((i % curNumPartitions, _)) > >>> }.reduceByKey(new HashPartitioner(curNumPartitions), > >>> cleanCombOp).values > >>> } > >>> partiallyAggregated.reduce(cleanCombOp) > >>> > >>> I am completely lost about what is happening in this function. I would > >>> greatly appreciate some sort of explanation. > >>> > >>> > >>> > >>> > >>> -- > >>> View this message in context: > >>> > http://apache-spark-user-list.1001560.n3.nabble.com/TreeReduce-Functionality-in-Spark-tp23147.html > >>> Sent from the Apache Spark User List mailing list archive at > Nabble.com. > >>> > >>> - > >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > > >>> For additional commands, e-mail: user-h...@spark.apache.org > > >>> > >> > > >
Re: TreeReduce Functionality in Spark
Hey Reza, Thanks for your response! Your response clarifies some of my initial thoughts. However, what I don't understand is how the depth of the tree is used to identify how many intermediate reducers there will be, and how many partitions are sent to the intermediate reducers. Could you provide some insight into this? Thanks, Raghav On Thursday, June 4, 2015, Reza Zadeh wrote: > In a regular reduce, all partitions have to send their reduced value to a > single machine, and that machine can become a bottleneck. > > In a treeReduce, the partitions talk to each other in a logarithmic number > of rounds. Imagine a binary tree that has all the partitions at its leaves > and the root will contain the final reduced value. This way there is no > single bottleneck machine. > > It remains to decide the number of children each node should have and how > deep the tree should be, which is some of the logic in the method you > pasted. > > On Wed, Jun 3, 2015 at 7:10 PM, raggy > wrote: > >> I am trying to understand what the treeReduce function for an RDD does, >> and >> how it is different from the normal reduce function. My current >> understanding is that treeReduce tries to split up the reduce into >> multiple >> steps. We do a partial reduce on different nodes, and then a final reduce >> is >> done to get the final result. Is this correct? If so, I guess what I am >> curious about is, how does spark decide how many nodes will be on each >> level, and how many partitions will be sent to a given node? >> >> The bulk of the implementation is within this function: >> >> partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth) >> .getOrElse(throw new UnsupportedOperationException("empty >> collection")) >> >> The above function is expanded to >> >> val cleanSeqOp = context.clean(seqOp) >> val cleanCombOp = context.clean(combOp) >> val aggregatePartition = >> (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, >> cleanCombOp) >> var partiallyAggregated = mapPartitions(it => >> Iterator(aggregatePartition(it))) >> var numPartitions = partiallyAggregated.partitions.length >> val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / >> depth)).toInt, 2) >> // If creating an extra level doesn't help reduce >> // the wall-clock time, we stop tree aggregation. >> while (numPartitions > scale + numPartitions / scale) { >> numPartitions /= scale >> val curNumPartitions = numPartitions >> partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex { >> (i, iter) => iter.map((i % curNumPartitions, _)) >> }.reduceByKey(new HashPartitioner(curNumPartitions), >> cleanCombOp).values >> } >> partiallyAggregated.reduce(cleanCombOp) >> >> I am completely lost about what is happening in this function. I would >> greatly appreciate some sort of explanation. >> >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/TreeReduce-Functionality-in-Spark-tp23147.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >> >
Re: Task result in Spark Worker Node
My apologies, I had pasted the wrong exception trace in the previous email. Here is the actual exception that I am receiving. Exception in thread "main" java.lang.NullPointerException at org.apache.spark.rdd.ParallelCollectionRDD$.slice(ParallelCollectionRDD.scala:154) at org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(ParallelCollectionRDD.scala:97) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) > On Apr 17, 2015, at 2:30 AM, Raghav Shankar wrote: > > Hey Imran, > > Thanks for the great explanation! This cleared up a lot of things for me. I > am actually trying to utilize some of the features within Spark for a system > I am developing. I am currently working on developing a subsystem that can be > integrated within Spark and other Big Data solutions. In order to integrate > it within Spark, I am trying to utilize the rdds and functions provided to > the reduce method on my system. My system is developed in Scala and Java. In > Spark, I have seen that the function provided to the reduce method, along > with the RDD, gets serialized and sent to the worker nodes. The worker nodes > are able to deserialize them and then execute the task on them. I see this > happening in ResultTask.scala. When I try to do something similar, I get > exceptions. The system I am developing has Spark jars in its build path, so > it is able to create a SparkContext etc. > > When I do, > > val bytes = closureSerializer.serialize((rdd, func) : AnyRef).array() > (similar to DAGScheduler.scala) > val (rdd2, func2) = closureSerializer.deserialize[(RDD[Int], (TaskContext, > Iterator[Int]) => Int)]( > ByteBuffer.wrap(bytes), Thread.currentThread.getContextClassLoader) > println(func2(context, rdd2.iterator(rdd2.partitions(1), context))); > > I get the proper result and can print it out. > > But when I involve the network by serializing the data, using the network to > send it to a different program, then deserialize the data and use the > function, I get the following error: > > Exception in thread "main" java.lang.NullPointerException > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:36) > at SimpleApp$$anonfun$1.apply(SimpleApp.scala:31) > at SimpleApp$$anonfun$1.apply(SimpleApp.scala:30) > at SimpleApp$$anonfun$2.apply(SimpleApp.scala:37) > at SimpleApp$$anonfun$2.apply(SimpleApp.scala:37) > at SimpleApp$.net(SimpleApp.scala:71) > at SimpleApp$.main(SimpleApp.scala:76) > at SimpleApp.main(SimpleApp.scala) > > I have also made sure that I am adding the class file of the program that is > sending the serialized data to the bin folder of the program that is > receiving the data. I’m not sure what I am doing wrong. I’ve done the > serialization and creation of the function similar to how Spark does it. I > created another reduce function like this. When implemented this way, it > prints out the result of func2 properly. But when I involve the network by > sending the serialized data to another program, I get the above exception. > >def reduceMod(f: (Integer, Integer) => Integer): Integer = { > val reducePartition: Iterator[Integer] => Option[Integer] = iter => { > if (iter.hasNext) { > Some(iter.reduceLeft(f)) > } else { > None > } > } > val processFunc = (context: TaskContext, iter: Iterator[Integer]) => > reducePartition(iter) > val func = processFunc.asInstanceOf[(TaskContext, Iterator[Int]) => Int] > context = new TaskContextImpl(stageId = 1, partitionId = 1, > taskAttemptId = 1, attemptNumber = 1, runningLocally = false) > println(func.getClass.getName); > println(func(context, rdd.iterator(rdd.partitions(1), context))); > val bb = closureSerializer.serialize((rdd, func) : AnyRef).array() > val (rdd2, func2) = closureSerializer.deserialize[(RDD[Int], > (TaskContext, Iterator[Int]) => Int)]( > ByteBuffer.wrap(bb), Thread.currentThread.getContextClassLoader) > println(func2(context, rdd3.iterator(rdd3.partitions(1), context))); > 1 > } > > I was wondering if you had any ideas on what I am doing wrong, or how I can > properly send the serialized version of the RDD and function to my other > program. My thought is that I might need to add more jars to the build path, > but I have no clue if thats the issue and what jars I need to add. > > Thanks, > Raghav > >> On Ap
Re: Task result in Spark Worker Node
Hey Imran, Thanks for the great explanation! This cleared up a lot of things for me. I am actually trying to utilize some of the features within Spark for a system I am developing. I am currently working on developing a subsystem that can be integrated within Spark and other Big Data solutions. In order to integrate it within Spark, I am trying to utilize the rdds and functions provided to the reduce method on my system. My system is developed in Scala and Java. In Spark, I have seen that the function provided to the reduce method, along with the RDD, gets serialized and sent to the worker nodes. The worker nodes are able to deserialize them and then execute the task on them. I see this happening in ResultTask.scala. When I try to do something similar, I get exceptions. The system I am developing has Spark jars in its build path, so it is able to create a SparkContext etc. When I do, val bytes = closureSerializer.serialize((rdd, func) : AnyRef).array() (similar to DAGScheduler.scala) val (rdd2, func2) = closureSerializer.deserialize[(RDD[Int], (TaskContext, Iterator[Int]) => Int)]( ByteBuffer.wrap(bytes), Thread.currentThread.getContextClassLoader) println(func2(context, rdd2.iterator(rdd2.partitions(1), context))); I get the proper result and can print it out. But when I involve the network by serializing the data, using the network to send it to a different program, then deserialize the data and use the function, I get the following error: Exception in thread "main" java.lang.NullPointerException at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:36) at SimpleApp$$anonfun$1.apply(SimpleApp.scala:31) at SimpleApp$$anonfun$1.apply(SimpleApp.scala:30) at SimpleApp$$anonfun$2.apply(SimpleApp.scala:37) at SimpleApp$$anonfun$2.apply(SimpleApp.scala:37) at SimpleApp$.net(SimpleApp.scala:71) at SimpleApp$.main(SimpleApp.scala:76) at SimpleApp.main(SimpleApp.scala) I have also made sure that I am adding the class file of the program that is sending the serialized data to the bin folder of the program that is receiving the data. I’m not sure what I am doing wrong. I’ve done the serialization and creation of the function similar to how Spark does it. I created another reduce function like this. When implemented this way, it prints out the result of func2 properly. But when I involve the network by sending the serialized data to another program, I get the above exception. def reduceMod(f: (Integer, Integer) => Integer): Integer = { val reducePartition: Iterator[Integer] => Option[Integer] = iter => { if (iter.hasNext) { Some(iter.reduceLeft(f)) } else { None } } val processFunc = (context: TaskContext, iter: Iterator[Integer]) => reducePartition(iter) val func = processFunc.asInstanceOf[(TaskContext, Iterator[Int]) => Int] context = new TaskContextImpl(stageId = 1, partitionId = 1, taskAttemptId = 1, attemptNumber = 1, runningLocally = false) println(func.getClass.getName); println(func(context, rdd.iterator(rdd.partitions(1), context))); val bb = closureSerializer.serialize((rdd, func) : AnyRef).array() val (rdd2, func2) = closureSerializer.deserialize[(RDD[Int], (TaskContext, Iterator[Int]) => Int)]( ByteBuffer.wrap(bb), Thread.currentThread.getContextClassLoader) println(func2(context, rdd3.iterator(rdd3.partitions(1), context))); 1 } I was wondering if you had any ideas on what I am doing wrong, or how I can properly send the serialized version of the RDD and function to my other program. My thought is that I might need to add more jars to the build path, but I have no clue if thats the issue and what jars I need to add. Thanks, Raghav > On Apr 13, 2015, at 10:22 PM, Imran Rashid wrote: > > On the worker side, it all happens in Executor. The task result is computed > here: > > https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L210 > > <https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L210> > > then its serialized along with some other goodies, and finally sent back to > the driver here: > > https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L255 > > <https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L255> > > What happens on the driver is quite a bit more complicated, and involves a > number of spots in the code, but at least to get you started, the results are > received here: > > https://github.com
Re: Sending RDD object over the network
Hey Akhil, Thanks for your response! No, I am not expecting to receive the values themselves. I am just trying to receive the RDD object on my second Spark application. However, I get a NPE when I try to use the object within my second program. Would you know how I can properly send the RDD object to my second program? Thanks, Raghav On Mon, Apr 6, 2015 at 3:08 AM, Akhil Das wrote: > Are you expecting to receive 1 to 100 values in your second program? > > RDD is just an abstraction, you would need to do like: > > num.foreach(x => send(x)) > > > Thanks > Best Regards > > On Mon, Apr 6, 2015 at 1:56 AM, raggy wrote: > >> For a class project, I am trying to utilize 2 spark Applications >> communicate >> with each other by passing an RDD object that was created from one >> application to another Spark application. The first application is >> developed >> in Scala and creates an RDD and sends it to the 2nd application over the >> network as follows: >> >> val logFile = "../../spark-1.3.0/README.md" // Should be some file on >> your system >> val conf = new SparkConf(); >> conf.setAppName("Simple Application").setMaster("local[2]") >> val sc = new SparkContext(conf) >> val nums = sc.parallelize(1 to 100, 2).toJavaRDD(); >> val s = new Socket("127.0.0.1", 8000); >> val objectOutput = new ObjectOutputStream(s.getOutputStream()); >> objectOutput.writeObject(nums); >> s.close(); >> The second Spark application is a Java application, which tries to receive >> the RDD object and then perform some operations on it. At the moment, I am >> trying to see if I have properly obtained the object. >> >> ServerSocket listener = null; >> Socket client; >> >> try{ >> listener = new ServerSocket(8000); >> }catch(Exception e){ >> e.printStackTrace(); >> } >> System.out.println("Listening"); >> try{ >> client = listener.accept(); >> ObjectInputStream objectInput = new >> ObjectInputStream(client.getInputStream()); >> Object object =(JavaRDD) objectInput.readObject(); >> JavaRDD tmp = (JavaRDD) object; >> >> if(tmp != null){ >> System.out.println(tmp.getStorageLevel().toString()); >> List p = tmp.partitions(); >> } >> else{ >> System.out.println("variable is null"); >> } >> >> }catch(Exception e){ >> e.printStackTrace(); >> } >> The output I get is: >> >> StorageLevel(false, false, false, false, 1) >> java.lang.NullPointerException >> at >> >> org.apache.spark.rdd.ParallelCollectionRDD$.slice(ParallelCollectionRDD.scala:154) >> at >> >> org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(ParallelCollectionRDD.scala:97) >> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) >> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) >> at scala.Option.getOrElse(Option.scala:120) >> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) >> at >> >> org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:56) >> at org.apache.spark.api.java.JavaRDD.partitions(JavaRDD.scala:32) >> at SimpleApp.main(SimpleApp.java:35) >> So, System.out.println(tmp.getStorageLevel().toString()); prints out >> properly. But, List p = tmp.partitions(); throws the >> NullPointerException. I can't seem to figure out why this is happening. >> >> In a nutshell, I am basically trying to create an RDD object in one Spark >> application and then send the object to another application. After >> receiving >> the object I try to make sure I received it properly by accessing its >> methods. Invoking the partitions() method in the original Spark >> application >> does not throw any errors either. I would greatly appreciate any >> suggestion >> on how I can solve my problem, or an alternative solution for what I am >> trying to accomplish. >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Sending-RDD-object-over-the-network-tp22382.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >