Spark on Kubernetes

2024-04-29 Thread Tarun raghav
Respected Sir/Madam,
I am Tarunraghav. I have a query regarding spark on kubernetes.

We have an eks cluster, within which we have spark installed in the pods.
We set the executor memory as 1GB and set the executor instances as 2, I
have also set dynamic allocation as true. So when I try to read a 3 GB CSV
file or parquet file, it is supposed to increase the number of pods by 2.
But the number of executor pods is zero.
I don't know why executor pods aren't being created, even though I set
executor instance as 2. Please suggest a solution for this.

Thanks & Regards,
Tarunraghav


Re: newbie question about RDD

2016-11-21 Thread Raghav
Sorry I forgot to ask how can I use spark context here ? I have hdfs
directory path of the files, as well as the name node of hdfs cluster.

Thanks for your help.

On Mon, Nov 21, 2016 at 9:45 PM, Raghav  wrote:

> Hi
>
> I am extremely new to Spark. I have to read a file form HDFS, and get it
> in memory  in RDD format.
>
> I have a Java class as follows:
>
> class Person {
> private long UUID;
> private String FirstName;
> private String LastName;
> private String zip;
>
>// public methods
> }
>
> The file in HDFS is as follows:
>
> UUID. FirstName LastName Zip
> 7462   John Doll06903
> 5231   Brad Finley 32820
>
>
> Can someone point me how to get a JavaRDD object by reading the
> file in HDFS ?
>
> Thanks.
>
> --
> Raghav
>



-- 
Raghav


newbie question about RDD

2016-11-21 Thread Raghav
Hi

I am extremely new to Spark. I have to read a file form HDFS, and get it in
memory  in RDD format.

I have a Java class as follows:

class Person {
private long UUID;
private String FirstName;
private String LastName;
private String zip;

   // public methods
}

The file in HDFS is as follows:

UUID. FirstName LastName Zip
7462   John Doll06903
5231   Brad Finley 32820


Can someone point me how to get a JavaRDD object by reading the
file in HDFS ?

Thanks.

-- 
Raghav


Kafka Producer within a docker Instance

2016-11-11 Thread Raghav
Hi

I run a spark job, where the executor is within a docker instance.  I want
to push the spark job output (one by one) to a Kafka broker which is
outside the docker instance.

Has anyone tried anything like this where Kafka producer is within a docker
and broker is outside ? I am a newbie to both Spark and Kafka, and looking
for some pointers to start exploring.

Thanks.

-- 
Raghav


Re: Newbie question - Best way to bootstrap with Spark

2016-11-07 Thread Raghav
Thanks a ton, guys.

On Sun, Nov 6, 2016 at 4:57 PM, raghav  wrote:

> I am newbie in the world of big data analytics, and I want to teach myself
> Apache Spark, and want to be able to write scripts to tinker with data.
>
> I have some understanding of Map Reduce but have not had a chance to get my
> hands dirty. There are tons of resources for Spark, but I am looking for
> some guidance for starter material, or videos.
>
> Thanks.
>
> Raghav
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Newbie-question-Best-way-to-
> bootstrap-with-Spark-tp28032.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Raghav


Re: Newbie question - Best way to bootstrap with Spark

2016-11-06 Thread Raghav
Can you please point out the right courses from EDX/Berkeley ?

Many thanks.

On Sun, Nov 6, 2016 at 6:08 PM, ayan guha  wrote:

> I would start with Spark documentation, really. Then you would probably
> start with some older videos from youtube, especially spark summit
> 2014,2015 and 2016 videos. Regading practice, I would strongly suggest
> Databricks cloud (or download prebuilt from spark site). You can also take
> courses from EDX/Berkley, which are very good starter courses.
>
> On Mon, Nov 7, 2016 at 11:57 AM, raghav  wrote:
>
>> I am newbie in the world of big data analytics, and I want to teach myself
>> Apache Spark, and want to be able to write scripts to tinker with data.
>>
>> I have some understanding of Map Reduce but have not had a chance to get
>> my
>> hands dirty. There are tons of resources for Spark, but I am looking for
>> some guidance for starter material, or videos.
>>
>> Thanks.
>>
>> Raghav
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Newbie-question-Best-way-to-bootstrap-
>> with-Spark-tp28032.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>


Newbie question - Best way to bootstrap with Spark

2016-11-06 Thread raghav
I am newbie in the world of big data analytics, and I want to teach myself
Apache Spark, and want to be able to write scripts to tinker with data.

I have some understanding of Map Reduce but have not had a chance to get my
hands dirty. There are tons of resources for Spark, but I am looking for
some guidance for starter material, or videos.

Thanks.

Raghav



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Newbie-question-Best-way-to-bootstrap-with-Spark-tp28032.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Submitting Spark Applications using Spark Submit

2015-06-20 Thread Raghav Shankar
 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Also, in the above error it says: connection refused to 
ec2-XXX.compute-1.amazonaws.com/10.165.103.16:7077 
<http://ec2-xxx.compute-1.amazonaws.com/10.165.103.16:7077> I don’t understand 
where it gets the 10.165.103.16 
<http://ec2-xxx.compute-1.amazonaws.com/10.165.103.16:7077> from. I never 
specify that in the master url command line parameter. Any ideas on what I 
might be doing wrong?

> On Jun 19, 2015, at 7:19 PM, Andrew Or  wrote:
> 
> Hi Raghav,
> 
> I'm assuming you're using standalone mode. When using the Spark EC2 scripts 
> you need to make sure that every machine has the most updated jars. Once you 
> have built on one of the nodes, you must rsync the Spark directory to the 
> rest of the nodes (see /root/spark-ec2/copy-dir).
> 
> That said, I usually build it locally on my laptop and scp the assembly jar 
> to the cluster instead of building it there. The EC2 machines often take much 
> longer to build for some reason. Also it's cumbersome to set up proper IDE 
> there.
> 
> -Andrew
> 
> 
> 2015-06-19 19:11 GMT-07:00 Raghav Shankar :
> Thanks Andrew! Is this all I have to do when using the spark ec2 script to 
> setup a spark cluster? It seems to be getting an assembly jar that is not 
> from my project(perhaps from a maven repo). Is there a way to make the ec2 
> script use the assembly jar that I created?
> 
> Thanks,
> Raghav 
> 
> 
> On Friday, June 19, 2015, Andrew Or  wrote:
> Hi Raghav,
> 
> If you want to make changes to Spark and run your application with it, you 
> may follow these steps.
> 
> 1. git clone g...@github.com:apache/spark
> 2. cd spark; build/mvn clean package -DskipTests [...]
> 3. make local changes
> 4. build/mvn package -DskipTests [...] (no need to clean again here)
> 5. bin/spark-submit --master spark://[...] --class your.main.class your.jar
> 
> No need to pass in extra --driver-java-options or --driver-extra-classpath as 
> others have suggested. When using spark-submit, the main jar comes from 
> assembly/target/scala_2.10, which is prepared through "mvn package". You just 
> have to make sure that you re-package the assembly jar after each 
> modification.
> 
> -Andrew
> 
> 2015-06-18 16:35 GMT-07:00 maxdml :
> You can specify the jars of your application to be included with spark-submit
> with the /--jars/ switch.
> 
> Otherwise, are you sure that your newly compiled spark jar assembly is in
> assembly/target/scala-2.10/?
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-Applications-using-Spark-Submit-tp23352p23400.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
> 
> 



Re: Submitting Spark Applications using Spark Submit

2015-06-19 Thread Raghav Shankar
Thanks Andrew! Is this all I have to do when using the spark ec2 script to
setup a spark cluster? It seems to be getting an assembly jar that is not
from my project(perhaps from a maven repo). Is there a way to make the ec2
script use the assembly jar that I created?

Thanks,
Raghav

On Friday, June 19, 2015, Andrew Or  wrote:

> Hi Raghav,
>
> If you want to make changes to Spark and run your application with it, you
> may follow these steps.
>
> 1. git clone g...@github.com:apache/spark
> 2. cd spark; build/mvn clean package -DskipTests [...]
> 3. make local changes
> 4. build/mvn package -DskipTests [...] (no need to clean again here)
> 5. bin/spark-submit --master spark://[...] --class your.main.class your.jar
>
> No need to pass in extra --driver-java-options or --driver-extra-classpath
> as others have suggested. When using spark-submit, the main jar comes from
> assembly/target/scala_2.10, which is prepared through "mvn package". You
> just have to make sure that you re-package the assembly jar after each
> modification.
>
> -Andrew
>
> 2015-06-18 16:35 GMT-07:00 maxdml  >:
>
>> You can specify the jars of your application to be included with
>> spark-submit
>> with the /--jars/ switch.
>>
>> Otherwise, are you sure that your newly compiled spark jar assembly is in
>> assembly/target/scala-2.10/?
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-Applications-using-Spark-Submit-tp23352p23400.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> 
>> For additional commands, e-mail: user-h...@spark.apache.org
>> 
>>
>>
>


Re: Implementing top() using treeReduce()

2015-06-17 Thread Raghav Shankar
So, I would add the assembly jar to the just the master or would I have to add 
it to all the slaves/workers too? 

Thanks,
Raghav

> On Jun 17, 2015, at 5:13 PM, DB Tsai  wrote:
> 
> You need to build the spark assembly with your modification and deploy
> into cluster.
> 
> Sincerely,
> 
> DB Tsai
> --
> Blog: https://www.dbtsai.com
> PGP Key ID: 0xAF08DF8D
> 
> 
> On Wed, Jun 17, 2015 at 5:11 PM, Raghav Shankar  
> wrote:
>> I’ve implemented this in the suggested manner. When I build Spark and attach
>> the new spark-core jar to my eclipse project, I am able to use the new
>> method. In order to conduct the experiments I need to launch my app on a
>> cluster. I am using EC2. When I setup my master and slaves using the EC2
>> setup scripts, it sets up spark, but I think my custom built spark-core jar
>> is not being used. How do it up on EC2 so that my custom version of
>> Spark-core is used?
>> 
>> Thanks,
>> Raghav
>> 
>> On Jun 9, 2015, at 7:41 PM, DB Tsai  wrote:
>> 
>> Having the following code in RDD.scala works for me. PS, in the following
>> code, I merge the smaller queue into larger one. I wonder if this will help
>> performance. Let me know when you do the benchmark.
>> 
>> def treeTakeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] =
>> withScope {
>>  if (num == 0) {
>>Array.empty
>>  } else {
>>val mapRDDs = mapPartitions { items =>
>>  // Priority keeps the largest elements, so let's reverse the ordering.
>>  val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
>>  queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
>>  Iterator.single(queue)
>>}
>>if (mapRDDs.partitions.length == 0) {
>>  Array.empty
>>} else {
>>  mapRDDs.treeReduce { (queue1, queue2) =>
>>if (queue1.size > queue2.size) {
>>  queue1 ++= queue2
>>  queue1
>>} else {
>>  queue2 ++= queue1
>>  queue2
>>}
>>  }.toArray.sorted(ord)
>>}
>>  }
>> }
>> 
>> def treeTop(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
>>  treeTakeOrdered(num)(ord.reverse)
>> }
>> 
>> 
>> 
>> Sincerely,
>> 
>> DB Tsai
>> --
>> Blog: https://www.dbtsai.com
>> PGP Key ID: 0xAF08DF8D
>> 
>> On Tue, Jun 9, 2015 at 10:09 AM, raggy  wrote:
>>> 
>>> I am trying to implement top-k in scala within apache spark. I am aware
>>> that
>>> spark has a top action. But, top() uses reduce(). Instead, I would like to
>>> use treeReduce(). I am trying to compare the performance of reduce() and
>>> treeReduce().
>>> 
>>> The main issue I have is that I cannot use these 2 lines of code which are
>>> used in the top() action within my Spark application.
>>> 
>>> val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
>>> queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
>>> 
>>> How can I go about implementing top() using treeReduce()?
>>> 
>>> 
>>> 
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Implementing-top-using-treeReduce-tp23227.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>> 
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>> 
>> 
>> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Implementing top() using treeReduce()

2015-06-17 Thread Raghav Shankar
I’ve implemented this in the suggested manner. When I build Spark and attach 
the new spark-core jar to my eclipse project, I am able to use the new method. 
In order to conduct the experiments I need to launch my app on a cluster. I am 
using EC2. When I setup my master and slaves using the EC2 setup scripts, it 
sets up spark, but I think my custom built spark-core jar is not being used. 
How do it up on EC2 so that my custom version of Spark-core is used?

Thanks,
Raghav

> On Jun 9, 2015, at 7:41 PM, DB Tsai  wrote:
> 
> Having the following code in RDD.scala works for me. PS, in the following 
> code, I merge the smaller queue into larger one. I wonder if this will help 
> performance. Let me know when you do the benchmark.
> def treeTakeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = 
> withScope {
>   if (num == 0) {
> Array.empty
>   } else {
> val mapRDDs = mapPartitions { items =>
>   // Priority keeps the largest elements, so let's reverse the ordering.
>   val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
>   queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
>   Iterator.single(queue)
> }
> if (mapRDDs.partitions.length == 0) {
>   Array.empty
> } else {
>   mapRDDs.treeReduce { (queue1, queue2) =>
> if (queue1.size > queue2.size) {
>   queue1 ++= queue2
>   queue1
> } else {
>   queue2 ++= queue1
>   queue2
> }
>   }.toArray.sorted(ord)
> }
>   }
> }
> 
> def treeTop(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
>   treeTakeOrdered(num)(ord.reverse)
> }
> 
> 
> Sincerely,
> 
> DB Tsai
> --
> Blog: https://www.dbtsai.com <https://www.dbtsai.com/>
> PGP Key ID: 0xAF08DF8D 
> <https://pgp.mit.edu/pks/lookup?search=0x59DF55B8AF08DF8D>
> 
> On Tue, Jun 9, 2015 at 10:09 AM, raggy  <mailto:raghav0110...@gmail.com>> wrote:
> I am trying to implement top-k in scala within apache spark. I am aware that
> spark has a top action. But, top() uses reduce(). Instead, I would like to
> use treeReduce(). I am trying to compare the performance of reduce() and
> treeReduce().
> 
> The main issue I have is that I cannot use these 2 lines of code which are
> used in the top() action within my Spark application.
> 
> val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
> queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
> 
> How can I go about implementing top() using treeReduce()?
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Implementing-top-using-treeReduce-tp23227.html
>  
> <http://apache-spark-user-list.1001560.n3.nabble.com/Implementing-top-using-treeReduce-tp23227.html>
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> For additional commands, e-mail: user-h...@spark.apache.org 
> <mailto:user-h...@spark.apache.org>
> 
> 



Re: Submitting Spark Applications using Spark Submit

2015-06-16 Thread Raghav Shankar
To clarify, I am using the spark standalone cluster.

On Tuesday, June 16, 2015, Yanbo Liang  wrote:

> If you run Spark on YARN, the simplest way is replace the
> $SPARK_HOME/lib/spark-.jar with your own version spark jar file and run
> your application.
> The spark-submit script will upload this jar to YARN cluster automatically
> and then you can run your application as usual.
> It does not care about which version of Spark in your YARN cluster.
>
> 2015-06-17 10:42 GMT+08:00 Raghav Shankar  >:
>
>> The documentation says spark.driver.userClassPathFirst can only be used
>> in cluster mode. Does this mean I have to set the --deploy-mode option
>> for spark-submit to cluster? Or can I still use the default client? My
>> understanding is that even in the default deploy mode, spark still uses
>> the slave machines I have on ec2.
>>
>> Also, the spark.driver.extraLibraryPath property mentions that I can
>> provide a path for special libraries on the spark-submit command line
>> options. Do my jar files in this path have to be the same name as the jar
>> used by spark, or is it intelligent enough to identify that two jars are
>> supposed to be the same thing? If they are supposed to be the same name,
>> how can I find out the name I should use for my jar? Eg: If I just name my
>> modified spark-core jar as spark.jar and put in a lib folder and provide
>> the path of the folder to spark-submit would that be enough to tell Spark
>> to use that spark-core jar instead of the default?
>>
>> Thanks,
>> Raghav
>>
>> On Jun 16, 2015, at 7:19 PM, Will Briggs > > wrote:
>>
>> If this is research-only, and you don't want to have to worry about
>> updating the jars installed by default on the cluster, you can add your
>> custom Spark jar using the "spark.driver.extraLibraryPath" configuration
>> property when running spark-submit, and then use the experimental "
>> spark.driver.userClassPathFirst" config to force it to use yours.
>>
>> See here for more details and options:
>> https://spark.apache.org/docs/1.4.0/configuration.html
>>
>> On June 16, 2015, at 10:12 PM, Raghav Shankar > > wrote:
>>
>> I made the change so that I could implement top() using treeReduce(). A
>> member on here suggested I make the change in RDD.scala to accomplish that.
>> Also, this is for a research project, and not for commercial use.
>>
>> So, any advice on how I can get the spark submit to use my custom built
>> jars would be very useful.
>>
>> Thanks,
>> Raghav
>>
>> On Jun 16, 2015, at 6:57 PM, Will Briggs > > wrote:
>>
>> In general, you should avoid making direct changes to the Spark source
>> code. If you are using Scala, you can seamlessly blend your own methods on
>> top of the base RDDs using implicit conversions.
>>
>> Regards,
>> Will
>>
>> On June 16, 2015, at 7:53 PM, raggy > > wrote:
>>
>> I am trying to submit a spark application using the command line. I used
>> the
>> spark submit command for doing so. I initially setup my Spark application
>> on
>> Eclipse and have been making changes on there. I recently obtained my own
>> version of the Spark source code and added a new method to RDD.scala. I
>> created a new spark core jar using mvn, and I added it to my eclipse build
>> path. My application ran perfectly fine.
>>
>> Now, I would like to submit it through the command line. I submitted my
>> application like this:
>>
>> bin/spark-submit --master local[2] --class "SimpleApp"
>> /Users/XXX/Desktop/spark2.jar
>>
>> The spark-submit command is within the spark project that I modified by
>> adding new methods.
>> When I do so, I get this error:
>>
>> java.lang.NoSuchMethodError:
>> org.apache.spark.rdd.RDD.treeTop(ILscala/math/Ordering;)Ljava/lang/Object;
>> at SimpleApp$.main(SimpleApp.scala:12)
>> at SimpleApp.main(SimpleApp.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>> at
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>> at java.lang.reflect.Method.invoke(Method.java:597)
>> at
>>
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
>> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala

Re: Submitting Spark Applications using Spark Submit

2015-06-16 Thread Raghav Shankar
The documentation says spark.driver.userClassPathFirst can only be used in 
cluster mode. Does this mean I have to set the --deploy-mode option for 
spark-submit to cluster? Or can I still use the default client? My 
understanding is that even in the default deploy mode, spark still uses the 
slave machines I have on ec2. 

Also, the spark.driver.extraLibraryPath property mentions that I can provide a 
path for special libraries on the spark-submit command line options. Do my jar 
files in this path have to be the same name as the jar used by spark, or is it 
intelligent enough to identify that two jars are supposed to be the same thing? 
If they are supposed to be the same name, how can I find out the name I should 
use for my jar? Eg: If I just name my modified spark-core jar as spark.jar and 
put in a lib folder and provide the path of the folder to spark-submit would 
that be enough to tell Spark to use that spark-core jar instead of the default?

Thanks,
Raghav

> On Jun 16, 2015, at 7:19 PM, Will Briggs  wrote:
> 
> If this is research-only, and you don't want to have to worry about updating 
> the jars installed by default on the cluster, you can add your custom Spark 
> jar using the "spark.driver.extraLibraryPath" configuration property when 
> running spark-submit, and then use the experimental " 
> spark.driver.userClassPathFirst" config to force it to use yours.
> 
> See here for more details and options: 
> https://spark.apache.org/docs/1.4.0/configuration.html
> 
> On June 16, 2015, at 10:12 PM, Raghav Shankar  wrote:
> 
> I made the change so that I could implement top() using treeReduce(). A 
> member on here suggested I make the change in RDD.scala to accomplish that. 
> Also, this is for a research project, and not for commercial use. 
> 
> So, any advice on how I can get the spark submit to use my custom built jars 
> would be very useful.
> 
> Thanks,
> Raghav
> 
>> On Jun 16, 2015, at 6:57 PM, Will Briggs  wrote:
>> 
>> In general, you should avoid making direct changes to the Spark source code. 
>> If you are using Scala, you can seamlessly blend your own methods on top of 
>> the base RDDs using implicit conversions.
>> 
>> Regards,
>> Will
>> 
>> On June 16, 2015, at 7:53 PM, raggy  wrote:
>> 
>> I am trying to submit a spark application using the command line. I used the
>> spark submit command for doing so. I initially setup my Spark application on
>> Eclipse and have been making changes on there. I recently obtained my own
>> version of the Spark source code and added a new method to RDD.scala. I
>> created a new spark core jar using mvn, and I added it to my eclipse build
>> path. My application ran perfectly fine. 
>> 
>> Now, I would like to submit it through the command line. I submitted my
>> application like this:
>> 
>> bin/spark-submit --master local[2] --class "SimpleApp"
>> /Users/XXX/Desktop/spark2.jar
>> 
>> The spark-submit command is within the spark project that I modified by
>> adding new methods.
>> When I do so, I get this error:
>> 
>> java.lang.NoSuchMethodError:
>> org.apache.spark.rdd.RDD.treeTop(ILscala/math/Ordering;)Ljava/lang/Object;
>>  at SimpleApp$.main(SimpleApp.scala:12)
>>  at SimpleApp.main(SimpleApp.scala)
>>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>  at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>>  at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>>  at java.lang.reflect.Method.invoke(Method.java:597)
>>  at
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
>>  at 
>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
>>  at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
>>  at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
>>  at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>> 
>> When I use spark submit, where does the jar come from? How do I make sure it
>> uses the jars that have built? 
>> 
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-Applications-using-Spark-Submit-tp23352.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>> 
> 



Re: Submitting Spark Applications using Spark Submit

2015-06-16 Thread Raghav Shankar
I made the change so that I could implement top() using treeReduce(). A member 
on here suggested I make the change in RDD.scala to accomplish that. Also, this 
is for a research project, and not for commercial use. 

So, any advice on how I can get the spark submit to use my custom built jars 
would be very useful.

Thanks,
Raghav

> On Jun 16, 2015, at 6:57 PM, Will Briggs  wrote:
> 
> In general, you should avoid making direct changes to the Spark source code. 
> If you are using Scala, you can seamlessly blend your own methods on top of 
> the base RDDs using implicit conversions.
> 
> Regards,
> Will
> 
> On June 16, 2015, at 7:53 PM, raggy  wrote:
> 
> I am trying to submit a spark application using the command line. I used the
> spark submit command for doing so. I initially setup my Spark application on
> Eclipse and have been making changes on there. I recently obtained my own
> version of the Spark source code and added a new method to RDD.scala. I
> created a new spark core jar using mvn, and I added it to my eclipse build
> path. My application ran perfectly fine. 
> 
> Now, I would like to submit it through the command line. I submitted my
> application like this:
> 
> bin/spark-submit --master local[2] --class "SimpleApp"
> /Users/XXX/Desktop/spark2.jar
> 
> The spark-submit command is within the spark project that I modified by
> adding new methods.
> When I do so, I get this error:
> 
> java.lang.NoSuchMethodError:
> org.apache.spark.rdd.RDD.treeTop(ILscala/math/Ordering;)Ljava/lang/Object;
>   at SimpleApp$.main(SimpleApp.scala:12)
>   at SimpleApp.main(SimpleApp.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>   at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>   at java.lang.reflect.Method.invoke(Method.java:597)
>   at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
>   at 
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
>   at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
>   at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
>   at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> 
> When I use spark submit, where does the jar come from? How do I make sure it
> uses the jars that have built? 
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-Applications-using-Spark-Submit-tp23352.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Different Sorting RDD methods in Apache Spark

2015-06-09 Thread Raghav Shankar
Thank you for you responses!

You mention that it only works as long as the data fits on a single
machine. What I am tying to do is receive the sorted contents of my
dataset. For this to be possible, the entire dataset should be able to fit
on a single machine. Are you saying that sorting the entire data and
collecting it on the driver node is not a typical use case? If I want to do
this using sortBy(), I would first call sortBy() followed by a collect().
Collect() would involve gathering all the data on a single machine as well.

Thanks,
Raghav

On Tuesday, June 9, 2015, Mark Hamstra  wrote:

> Correct.  Trading away scalability for increased performance is not an
> option for the standard Spark API.
>
> On Tue, Jun 9, 2015 at 3:05 AM, Daniel Darabos <
> daniel.dara...@lynxanalytics.com
> > wrote:
>
>> It would be even faster to load the data on the driver and sort it there
>> without using Spark :). Using reduce() is cheating, because it only works
>> as long as the data fits on one machine. That is not the targeted use case
>> of a distributed computation system. You can repeat your test with more
>> data (that doesn't fit on one machine) to see what I mean.
>>
>> On Tue, Jun 9, 2015 at 8:30 AM, raggy > > wrote:
>>
>>> For a research project, I tried sorting the elements in an RDD. I did
>>> this in
>>> two different approaches.
>>>
>>> In the first method, I applied a mapPartitions() function on the RDD, so
>>> that it would sort the contents of the RDD, and provide a result RDD that
>>> contains the sorted list as the only record in the RDD. Then, I applied a
>>> reduce function which basically merges sorted lists.
>>>
>>> I ran these experiments on an EC2 cluster containing 30 nodes. I set it
>>> up
>>> using the spark ec2 script. The data file was stored in HDFS.
>>>
>>> In the second approach I used the sortBy method in Spark.
>>>
>>> I performed these operation on the US census data(100MB) found here
>>>
>>> A single lines looks like this
>>>
>>> 9, Not in universe, 0, 0, Children, 0, Not in universe, Never married,
>>> Not
>>> in universe or children, Not in universe, White, All other, Female, Not
>>> in
>>> universe, Not in universe, Children or Armed Forces, 0, 0, 0, Nonfiler,
>>> Not
>>> in universe, Not in universe, Child <18 never marr not in subfamily,
>>> Child
>>> under 18 never married, 1758.14, Nonmover, Nonmover, Nonmover, Yes, Not
>>> in
>>> universe, 0, Both parents present, United-States, United-States,
>>> United-States, Native- Born in the United States, 0, Not in universe, 0,
>>> 0,
>>> 94, - 5.
>>> I sorted based on the 25th value in the CSV. In this line that is
>>> 1758.14.
>>>
>>> I noticed that sortBy performs worse than the other method. Is this the
>>> expected scenario? If it is, why wouldn't the mapPartitions() and
>>> reduce()
>>> be the default sorting approach?
>>>
>>> Here is my implementation
>>>
>>> public static void sortBy(JavaSparkContext sc){
>>> JavaRDD rdd = sc.textFile("/data.txt",32);
>>> long start = System.currentTimeMillis();
>>> rdd.sortBy(new Function(){
>>>
>>> @Override
>>> public Double call(String v1) throws Exception {
>>>   // TODO Auto-generated method stub
>>>   String [] arr = v1.split(",");
>>>   return Double.parseDouble(arr[24]);
>>> }
>>> }, true, 9).collect();
>>> long end = System.currentTimeMillis();
>>> System.out.println("SortBy: " + (end - start));
>>>   }
>>>
>>> public static void sortList(JavaSparkContext sc){
>>> JavaRDD rdd = sc.textFile("/data.txt",32);
>>> //parallelize(l,
>>> 8);
>>> long start = System.currentTimeMillis();
>>> JavaRDD>> rdd3 =
>>> rdd.mapPartitions(new FlatMapFunction,
>>> LinkedList>>(){
>>>
>>> @Override
>>> public Iterable>>
>>> call(Iterator t)
>>> throws Exception {
>>>   // TODO Auto-generated method stub
>>>   LinkedList> lines = new
>>> LinkedList>();
>>>   while(t.hasNext()){
>>> String s = t.next();
>>> String arr1[] = s.split(",");
>>> Tuple2 t1 = new Tuple2>> String>(Double.parseDouble(arr1[24]),s);
>>> lines.add(t1);
>>>   }
>>>   Collections.sort(lines, new IncomeComparator());
>>>   LinkedList>> list = new
>>> LinkedList>>();
>>>   list.add(lines);
>>>   return list;
>>> }
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Different-Sorting-RDD-methods-in-Apache-Spark-tp23214.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> 
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>> 
>>>
>>>
>>
>


Re: TreeReduce Functionality in Spark

2015-06-04 Thread Raghav Shankar
Hey DB,

Thanks for the reply!

I still don't think this answers my question. For example, if I have a
top() action being executed and I have 32 workers(32 partitions), and I
choose a depth of 4, what does the overlay of intermediate reducers look
like? How many reducers are there excluding the master and the worker? How
many partitions get sent to each of these intermediate reducers? Does this
number vary at each level?

Thanks!

On Thursday, June 4, 2015, DB Tsai  wrote:

> By default, the depth of the tree is 2. Each partition will be one node.
>
> Sincerely,
>
> DB Tsai
> ---
> Blog: https://www.dbtsai.com
>
>
> On Thu, Jun 4, 2015 at 10:46 AM, Raghav Shankar  > wrote:
> > Hey Reza,
> >
> > Thanks for your response!
> >
> > Your response clarifies some of my initial thoughts. However, what I
> don't
> > understand is how the depth of the tree is used to identify how many
> > intermediate reducers there will be, and how many partitions are sent to
> the
> > intermediate reducers. Could you provide some insight into this?
> >
> > Thanks,
> > Raghav
> >
> > On Thursday, June 4, 2015, Reza Zadeh  > wrote:
> >>
> >> In a regular reduce, all partitions have to send their reduced value to
> a
> >> single machine, and that machine can become a bottleneck.
> >>
> >> In a treeReduce, the partitions talk to each other in a logarithmic
> number
> >> of rounds. Imagine a binary tree that has all the partitions at its
> leaves
> >> and the root will contain the final reduced value. This way there is no
> >> single bottleneck machine.
> >>
> >> It remains to decide the number of children each node should have and
> how
> >> deep the tree should be, which is some of the logic in the method you
> >> pasted.
> >>
> >> On Wed, Jun 3, 2015 at 7:10 PM, raggy  > wrote:
> >>>
> >>> I am trying to understand what the treeReduce function for an RDD does,
> >>> and
> >>> how it is different from the normal reduce function. My current
> >>> understanding is that treeReduce tries to split up the reduce into
> >>> multiple
> >>> steps. We do a partial reduce on different nodes, and then a final
> reduce
> >>> is
> >>> done to get the final result. Is this correct? If so, I guess what I am
> >>> curious about is, how does spark decide how many nodes will be on each
> >>> level, and how many partitions will be sent to a given node?
> >>>
> >>> The bulk of the implementation is within this function:
> >>>
> >>> partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth)
> >>>   .getOrElse(throw new UnsupportedOperationException("empty
> >>> collection"))
> >>>
> >>> The above function is expanded to
> >>>
> >>> val cleanSeqOp = context.clean(seqOp)
> >>>   val cleanCombOp = context.clean(combOp)
> >>>   val aggregatePartition =
> >>> (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp,
> >>> cleanCombOp)
> >>>   var partiallyAggregated = mapPartitions(it =>
> >>> Iterator(aggregatePartition(it)))
> >>>   var numPartitions = partiallyAggregated.partitions.length
> >>>   val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 /
> >>> depth)).toInt, 2)
> >>>   // If creating an extra level doesn't help reduce
> >>>   // the wall-clock time, we stop tree aggregation.
> >>>   while (numPartitions > scale + numPartitions / scale) {
> >>> numPartitions /= scale
> >>> val curNumPartitions = numPartitions
> >>> partiallyAggregated =
> partiallyAggregated.mapPartitionsWithIndex
> >>> {
> >>>   (i, iter) => iter.map((i % curNumPartitions, _))
> >>> }.reduceByKey(new HashPartitioner(curNumPartitions),
> >>> cleanCombOp).values
> >>>   }
> >>>   partiallyAggregated.reduce(cleanCombOp)
> >>>
> >>> I am completely lost about what is happening in this function. I would
> >>> greatly appreciate some sort of explanation.
> >>>
> >>>
> >>>
> >>>
> >>> --
> >>> View this message in context:
> >>>
> http://apache-spark-user-list.1001560.n3.nabble.com/TreeReduce-Functionality-in-Spark-tp23147.html
> >>> Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
> >>>
> >>> -
> >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> 
> >>> For additional commands, e-mail: user-h...@spark.apache.org
> 
> >>>
> >>
> >
>


Re: TreeReduce Functionality in Spark

2015-06-04 Thread Raghav Shankar
Hey Reza,

Thanks for your response!

Your response clarifies some of my initial thoughts. However, what I don't
understand is how the depth of the tree is used to identify how many
intermediate reducers there will be, and how many partitions are sent to
the intermediate reducers. Could you provide some insight into this?

Thanks,
Raghav

On Thursday, June 4, 2015, Reza Zadeh  wrote:

> In a regular reduce, all partitions have to send their reduced value to a
> single machine, and that machine can become a bottleneck.
>
> In a treeReduce, the partitions talk to each other in a logarithmic number
> of rounds. Imagine a binary tree that has all the partitions at its leaves
> and the root will contain the final reduced value. This way there is no
> single bottleneck machine.
>
> It remains to decide the number of children each node should have and how
> deep the tree should be, which is some of the logic in the method you
> pasted.
>
> On Wed, Jun 3, 2015 at 7:10 PM, raggy  > wrote:
>
>> I am trying to understand what the treeReduce function for an RDD does,
>> and
>> how it is different from the normal reduce function. My current
>> understanding is that treeReduce tries to split up the reduce into
>> multiple
>> steps. We do a partial reduce on different nodes, and then a final reduce
>> is
>> done to get the final result. Is this correct? If so, I guess what I am
>> curious about is, how does spark decide how many nodes will be on each
>> level, and how many partitions will be sent to a given node?
>>
>> The bulk of the implementation is within this function:
>>
>> partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth)
>>   .getOrElse(throw new UnsupportedOperationException("empty
>> collection"))
>>
>> The above function is expanded to
>>
>> val cleanSeqOp = context.clean(seqOp)
>>   val cleanCombOp = context.clean(combOp)
>>   val aggregatePartition =
>> (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp,
>> cleanCombOp)
>>   var partiallyAggregated = mapPartitions(it =>
>> Iterator(aggregatePartition(it)))
>>   var numPartitions = partiallyAggregated.partitions.length
>>   val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 /
>> depth)).toInt, 2)
>>   // If creating an extra level doesn't help reduce
>>   // the wall-clock time, we stop tree aggregation.
>>   while (numPartitions > scale + numPartitions / scale) {
>> numPartitions /= scale
>> val curNumPartitions = numPartitions
>> partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex {
>>   (i, iter) => iter.map((i % curNumPartitions, _))
>> }.reduceByKey(new HashPartitioner(curNumPartitions),
>> cleanCombOp).values
>>   }
>>   partiallyAggregated.reduce(cleanCombOp)
>>
>> I am completely lost about what is happening in this function. I would
>> greatly appreciate some sort of explanation.
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/TreeReduce-Functionality-in-Spark-tp23147.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> 
>> For additional commands, e-mail: user-h...@spark.apache.org
>> 
>>
>>
>


Re: Task result in Spark Worker Node

2015-04-17 Thread Raghav Shankar
My apologies, I had pasted the wrong exception trace in the previous email. 
Here is the actual exception that I am receiving. 

Exception in thread "main" java.lang.NullPointerException
at 
org.apache.spark.rdd.ParallelCollectionRDD$.slice(ParallelCollectionRDD.scala:154)
at 
org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(ParallelCollectionRDD.scala:97)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)

> On Apr 17, 2015, at 2:30 AM, Raghav Shankar  wrote:
> 
> Hey Imran, 
> 
>  Thanks for the great explanation! This cleared up a lot of things for me. I 
> am actually trying to utilize some of the features within Spark for a system 
> I am developing. I am currently working on developing a subsystem that can be 
> integrated within Spark and other Big Data solutions. In order to integrate 
> it within Spark, I am trying to utilize the rdds and functions provided to 
> the reduce method on my system. My system is developed in Scala and Java. In 
> Spark, I have seen that the function provided to the reduce method, along 
> with the RDD, gets serialized and sent to the worker nodes. The worker nodes 
> are able to deserialize them and then execute the task on them. I see this 
> happening in ResultTask.scala. When I try to do something similar, I get 
> exceptions. The system I am developing has Spark jars in its build path, so 
> it is able to create a SparkContext etc. 
> 
> When I do, 
> 
> val bytes = closureSerializer.serialize((rdd, func) : AnyRef).array() 
> (similar to DAGScheduler.scala)
> val (rdd2, func2) = closureSerializer.deserialize[(RDD[Int], (TaskContext, 
> Iterator[Int]) => Int)](
>   ByteBuffer.wrap(bytes), Thread.currentThread.getContextClassLoader)
> println(func2(context, rdd2.iterator(rdd2.partitions(1), context)));
> 
> I get the proper result and can print it out. 
> 
> But when I involve the network by serializing the data, using the network to 
> send it to a different program, then deserialize the data and use the 
> function, I get the following error:
> 
> Exception in thread "main" java.lang.NullPointerException
>   at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:36)
>   at SimpleApp$$anonfun$1.apply(SimpleApp.scala:31)
>   at SimpleApp$$anonfun$1.apply(SimpleApp.scala:30)
>   at SimpleApp$$anonfun$2.apply(SimpleApp.scala:37)
>   at SimpleApp$$anonfun$2.apply(SimpleApp.scala:37)
>   at SimpleApp$.net(SimpleApp.scala:71)
>   at SimpleApp$.main(SimpleApp.scala:76)
>   at SimpleApp.main(SimpleApp.scala)
> 
> I have also made sure that I am adding the class file of the program that is 
> sending the serialized data to the bin folder of the program that is 
> receiving the data. I’m not sure what I am doing wrong. I’ve done the 
> serialization and creation of the function similar to how Spark does it. I 
> created another reduce function like this. When implemented this way, it 
> prints out the result of func2 properly. But when I involve the network by 
> sending the serialized data to another program, I get the above exception. 
> 
>def reduceMod(f: (Integer, Integer) => Integer): Integer = {
> val reducePartition: Iterator[Integer] => Option[Integer] = iter => {
>   if (iter.hasNext) {
> Some(iter.reduceLeft(f))
>   } else {
> None
>   }
> }
> val processFunc = (context: TaskContext, iter: Iterator[Integer]) => 
> reducePartition(iter)
> val func = processFunc.asInstanceOf[(TaskContext, Iterator[Int]) => Int]
> context = new TaskContextImpl(stageId = 1, partitionId = 1,
>   taskAttemptId = 1, attemptNumber = 1, runningLocally = false)
> println(func.getClass.getName);
> println(func(context, rdd.iterator(rdd.partitions(1), context)));
> val bb = closureSerializer.serialize((rdd, func) : AnyRef).array()
> val (rdd2, func2) = closureSerializer.deserialize[(RDD[Int], 
> (TaskContext, Iterator[Int]) => Int)](
>   ByteBuffer.wrap(bb), Thread.currentThread.getContextClassLoader)
> println(func2(context, rdd3.iterator(rdd3.partitions(1), context)));
> 1
>   }
>  
> I was wondering if you had any ideas on what I am doing wrong, or how I can 
> properly send the serialized version of the RDD and function to my other 
> program. My thought is that I might need to add more jars to the build path, 
> but I have no clue if thats the issue and what jars I need to add. 
> 
> Thanks,
> Raghav
> 
>> On Ap

Re: Task result in Spark Worker Node

2015-04-17 Thread Raghav Shankar
Hey Imran, 

 Thanks for the great explanation! This cleared up a lot of things for me. I am 
actually trying to utilize some of the features within Spark for a system I am 
developing. I am currently working on developing a subsystem that can be 
integrated within Spark and other Big Data solutions. In order to integrate it 
within Spark, I am trying to utilize the rdds and functions provided to the 
reduce method on my system. My system is developed in Scala and Java. In Spark, 
I have seen that the function provided to the reduce method, along with the 
RDD, gets serialized and sent to the worker nodes. The worker nodes are able to 
deserialize them and then execute the task on them. I see this happening in 
ResultTask.scala. When I try to do something similar, I get exceptions. The 
system I am developing has Spark jars in its build path, so it is able to 
create a SparkContext etc. 

When I do, 

val bytes = closureSerializer.serialize((rdd, func) : AnyRef).array() (similar 
to DAGScheduler.scala)
val (rdd2, func2) = closureSerializer.deserialize[(RDD[Int], (TaskContext, 
Iterator[Int]) => Int)](
  ByteBuffer.wrap(bytes), Thread.currentThread.getContextClassLoader)
println(func2(context, rdd2.iterator(rdd2.partitions(1), context)));

I get the proper result and can print it out. 

But when I involve the network by serializing the data, using the network to 
send it to a different program, then deserialize the data and use the function, 
I get the following error:

Exception in thread "main" java.lang.NullPointerException
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:36)
at SimpleApp$$anonfun$1.apply(SimpleApp.scala:31)
at SimpleApp$$anonfun$1.apply(SimpleApp.scala:30)
at SimpleApp$$anonfun$2.apply(SimpleApp.scala:37)
at SimpleApp$$anonfun$2.apply(SimpleApp.scala:37)
at SimpleApp$.net(SimpleApp.scala:71)
at SimpleApp$.main(SimpleApp.scala:76)
at SimpleApp.main(SimpleApp.scala)

I have also made sure that I am adding the class file of the program that is 
sending the serialized data to the bin folder of the program that is receiving 
the data. I’m not sure what I am doing wrong. I’ve done the serialization and 
creation of the function similar to how Spark does it. I created another reduce 
function like this. When implemented this way, it prints out the result of 
func2 properly. But when I involve the network by sending the serialized data 
to another program, I get the above exception. 

   def reduceMod(f: (Integer, Integer) => Integer): Integer = {
val reducePartition: Iterator[Integer] => Option[Integer] = iter => {
  if (iter.hasNext) {
Some(iter.reduceLeft(f))
  } else {
None
  }
}
val processFunc = (context: TaskContext, iter: Iterator[Integer]) => 
reducePartition(iter)
val func = processFunc.asInstanceOf[(TaskContext, Iterator[Int]) => Int]
context = new TaskContextImpl(stageId = 1, partitionId = 1,
  taskAttemptId = 1, attemptNumber = 1, runningLocally = false)
println(func.getClass.getName);
println(func(context, rdd.iterator(rdd.partitions(1), context)));
val bb = closureSerializer.serialize((rdd, func) : AnyRef).array()
val (rdd2, func2) = closureSerializer.deserialize[(RDD[Int], (TaskContext, 
Iterator[Int]) => Int)](
  ByteBuffer.wrap(bb), Thread.currentThread.getContextClassLoader)
println(func2(context, rdd3.iterator(rdd3.partitions(1), context)));
1
  }
 
I was wondering if you had any ideas on what I am doing wrong, or how I can 
properly send the serialized version of the RDD and function to my other 
program. My thought is that I might need to add more jars to the build path, 
but I have no clue if thats the issue and what jars I need to add. 

Thanks,
Raghav

> On Apr 13, 2015, at 10:22 PM, Imran Rashid  wrote:
> 
> On the worker side, it all happens in Executor.  The task result is computed 
> here:
> 
> https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L210
>  
> <https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L210>
> 
> then its serialized along with some other goodies, and finally sent back to 
> the driver here:
> 
> https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L255
>  
> <https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L255>
> 
> What happens on the driver is quite a bit more complicated, and involves a 
> number of spots in the code, but at least to get you started, the results are 
> received here:
> 
> https://github.com

Re: Sending RDD object over the network

2015-04-06 Thread Raghav Shankar
Hey Akhil,

 Thanks for your response! No, I am not expecting to receive the values
themselves. I am just trying to receive the RDD object on my second Spark
application. However, I get a NPE when I try to use the object within my
second program. Would you know how I can properly send the RDD object to my
second program?

Thanks,
Raghav

On Mon, Apr 6, 2015 at 3:08 AM, Akhil Das 
wrote:

> Are you expecting to receive 1 to 100 values in your second program?
>
> RDD is just an abstraction, you would need to do like:
>
> num.foreach(x => send(x))
>
>
> Thanks
> Best Regards
>
> On Mon, Apr 6, 2015 at 1:56 AM, raggy  wrote:
>
>> For a class project, I am trying to utilize 2 spark Applications
>> communicate
>> with each other by passing an RDD object that was created from one
>> application to another Spark application. The first application is
>> developed
>> in Scala and creates an RDD and sends it to the 2nd application over the
>> network as follows:
>>
>> val logFile = "../../spark-1.3.0/README.md" // Should be some file on
>> your system
>> val conf = new SparkConf();
>> conf.setAppName("Simple Application").setMaster("local[2]")
>> val sc = new SparkContext(conf)
>> val nums = sc.parallelize(1 to 100, 2).toJavaRDD();
>> val s = new Socket("127.0.0.1", 8000);
>> val objectOutput = new ObjectOutputStream(s.getOutputStream());
>> objectOutput.writeObject(nums);
>> s.close();
>> The second Spark application is a Java application, which tries to receive
>> the RDD object and then perform some operations on it. At the moment, I am
>> trying to see if I have properly obtained the object.
>>
>> ServerSocket listener = null;
>> Socket client;
>>
>> try{
>> listener = new ServerSocket(8000);
>> }catch(Exception e){
>> e.printStackTrace();
>> }
>> System.out.println("Listening");
>> try{
>> client = listener.accept();
>> ObjectInputStream objectInput = new
>> ObjectInputStream(client.getInputStream());
>> Object object =(JavaRDD) objectInput.readObject();
>> JavaRDD tmp = (JavaRDD) object;
>>
>> if(tmp != null){
>> System.out.println(tmp.getStorageLevel().toString());
>> List p = tmp.partitions();
>> }
>> else{
>> System.out.println("variable is null");
>> }
>>
>> }catch(Exception e){
>> e.printStackTrace();
>> }
>> The output I get is:
>>
>> StorageLevel(false, false, false, false, 1)
>> java.lang.NullPointerException
>> at
>>
>> org.apache.spark.rdd.ParallelCollectionRDD$.slice(ParallelCollectionRDD.scala:154)
>> at
>>
>> org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(ParallelCollectionRDD.scala:97)
>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
>> at scala.Option.getOrElse(Option.scala:120)
>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
>> at
>>
>> org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:56)
>> at org.apache.spark.api.java.JavaRDD.partitions(JavaRDD.scala:32)
>> at SimpleApp.main(SimpleApp.java:35)
>> So, System.out.println(tmp.getStorageLevel().toString()); prints out
>> properly. But, List p = tmp.partitions(); throws the
>> NullPointerException. I can't seem to figure out why this is happening.
>>
>> In a nutshell, I am basically trying to create an RDD object in one Spark
>> application and then send the object to another application. After
>> receiving
>> the object I try to make sure I received it properly by accessing its
>> methods. Invoking the partitions() method in the original Spark
>> application
>> does not throw any errors either. I would greatly appreciate any
>> suggestion
>> on how I can solve my problem, or an alternative solution for what I am
>> trying to accomplish.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Sending-RDD-object-over-the-network-tp22382.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>