No Sonal, I'm not doing any explicit call to stop context. If you see my previous post to Michael, the commented portion of the code is my requirement. When I run this over standalone spark cluster, the execution keeps running with no output or error. After waiting for several minutes I'm killing it by pressing Ctrl+C in the terminal.
But the same code runs perfectly when executed from spark shell. ~Sarath On Thu, Jul 17, 2014 at 1:05 PM, Sonal Goyal <sonalgoy...@gmail.com> wrote: > Hi Sarath, > > Are you explicitly stopping the context? > > sc.stop() > > > > > Best Regards, > Sonal > Nube Technologies <http://www.nubetech.co> > > <http://in.linkedin.com/in/sonalgoyal> > > > > > On Thu, Jul 17, 2014 at 12:51 PM, Sarath Chandra < > sarathchandra.jos...@algofusiontech.com> wrote: > >> Hi Michael, Soumya, >> >> Can you please check and let me know what is the issue? what am I missing? >> Let me know if you need any logs to analyze. >> >> ~Sarath >> >> >> On Wed, Jul 16, 2014 at 8:24 PM, Sarath Chandra < >> sarathchandra.jos...@algofusiontech.com> wrote: >> >>> Hi Michael, >>> >>> Tried it. It's correctly printing the line counts of both the files. >>> Here's what I tried - >>> >>> *Code:* >>> *package test* >>> *object Test4 {* >>> * case class Test(fld1: String, * >>> * fld2: String, * >>> * fld3: String, * >>> * fld4: String, * >>> * fld5: String, * >>> * fld6: Double, * >>> * fld7: String);* >>> * def main(args: Array[String]) {* >>> * val conf = new SparkConf()* >>> * .setMaster(args(0))* >>> * .setAppName("SQLTest")* >>> * .setSparkHome(args(1))* >>> * .set("spark.executor.memory", "2g");* >>> * val sc = new SparkContext(conf);* >>> * sc.addJar("test1-0.1.jar");* >>> * val file1 = sc.textFile(args(2));* >>> * println(file1.count());* >>> * val file2 = sc.textFile(args(3));* >>> * println(file2.count());* >>> *// val sq = new SQLContext(sc);* >>> *// import sq._* >>> *// val file1_recs: RDD[Test] = file1.map(_.split(",")).map(l => >>> Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));* >>> *// val file2_recs: RDD[Test] = file2.map(_.split(",")).map(s => >>> Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));* >>> *// val file1_schema = sq.createSchemaRDD(file1_recs);* >>> *// val file2_schema = sq.createSchemaRDD(file2_recs);* >>> *// file1_schema.registerAsTable("file1_tab");* >>> *// file2_schema.registerAsTable("file2_tab");* >>> *// val matched = sq.sql("select * from file1_tab l join file2_tab s >>> on " + * >>> *// "l.fld7=s.fld7 where l.fld2=s.fld2 and " + * >>> *// "l.fld3=s.fld3 and l.fld4=s.fld4 and " + * >>> *// "l.fld6=s.fld6");* >>> *// matched.collect().foreach(println);* >>> * }* >>> *}* >>> >>> *Execution:* >>> *export CLASSPATH=$HADOOP_PREFIX/conf:$SPARK_HOME/lib/*:test1-0.1.jar* >>> *export CONFIG_OPTS="-Dspark.jars=test1-0.1.jar"* >>> *java -cp $CLASSPATH $CONFIG_OPTS test.Test4 spark://master:7077 >>> "/usr/local/spark-1.0.1-bin-hadoop1" >>> hdfs://master:54310/user/hduser/file1.csv >>> hdfs://master:54310/user/hduser/file2.csv* >>> >>> ~Sarath >>> >>> On Wed, Jul 16, 2014 at 8:14 PM, Michael Armbrust < >>> mich...@databricks.com> wrote: >>> >>>> What if you just run something like: >>>> *sc.textFile("hdfs://localhost:54310/user/hduser/file1.csv").count()* >>>> >>>> >>>> On Wed, Jul 16, 2014 at 10:37 AM, Sarath Chandra < >>>> sarathchandra.jos...@algofusiontech.com> wrote: >>>> >>>>> Yes Soumya, I did it. >>>>> >>>>> First I tried with the example available in the documentation (example >>>>> using people table and finding teenagers). After successfully running it, >>>>> I >>>>> moved on to this one which is starting point to a bigger requirement for >>>>> which I'm evaluating Spark SQL. >>>>> >>>>> >>>>> On Wed, Jul 16, 2014 at 7:59 PM, Soumya Simanta < >>>>> soumya.sima...@gmail.com> wrote: >>>>> >>>>>> >>>>>> >>>>>> Can you try submitting a very simple job to the cluster. >>>>>> >>>>>> On Jul 16, 2014, at 10:25 AM, Sarath Chandra < >>>>>> sarathchandra.jos...@algofusiontech.com> wrote: >>>>>> >>>>>> Yes it is appearing on the Spark UI, and remains there with state as >>>>>> "RUNNING" till I press Ctrl+C in the terminal to kill the execution. >>>>>> >>>>>> Barring the statements to create the spark context, if I copy paste >>>>>> the lines of my code in spark shell, runs perfectly giving the desired >>>>>> output. >>>>>> >>>>>> ~Sarath >>>>>> >>>>>> On Wed, Jul 16, 2014 at 7:48 PM, Soumya Simanta < >>>>>> soumya.sima...@gmail.com> wrote: >>>>>> >>>>>>> When you submit your job, it should appear on the Spark UI. Same >>>>>>> with the REPL. Make sure you job is submitted to the cluster properly. >>>>>>> >>>>>>> >>>>>>> On Wed, Jul 16, 2014 at 10:08 AM, Sarath Chandra < >>>>>>> sarathchandra.jos...@algofusiontech.com> wrote: >>>>>>> >>>>>>>> Hi Soumya, >>>>>>>> >>>>>>>> Data is very small, 500+ lines in each file. >>>>>>>> >>>>>>>> Removed last 2 lines and placed this at the end >>>>>>>> "matched.collect().foreach(println);". Still no luck. It's been more >>>>>>>> than >>>>>>>> 5min, the execution is still running. >>>>>>>> >>>>>>>> Checked logs, nothing in stdout. In stderr I don't see anything >>>>>>>> going wrong, all are info messages. >>>>>>>> >>>>>>>> What else do I need check? >>>>>>>> >>>>>>>> ~Sarath >>>>>>>> >>>>>>>> On Wed, Jul 16, 2014 at 7:23 PM, Soumya Simanta < >>>>>>>> soumya.sima...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Check your executor logs for the output or if your data is not big >>>>>>>>> collect it in the driver and print it. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Jul 16, 2014, at 9:21 AM, Sarath Chandra < >>>>>>>>> sarathchandra.jos...@algofusiontech.com> wrote: >>>>>>>>> >>>>>>>>> Hi All, >>>>>>>>> >>>>>>>>> I'm trying to do a simple record matching between 2 files and >>>>>>>>> wrote following code - >>>>>>>>> >>>>>>>>> *import org.apache.spark.sql.SQLContext;* >>>>>>>>> *import org.apache.spark.rdd.RDD* >>>>>>>>> *object SqlTest {* >>>>>>>>> * case class Test(fld1:String, fld2:String, fld3:String, >>>>>>>>> fld4:String, fld4:String, fld5:Double, fld6:String);* >>>>>>>>> * sc.addJar("test1-0.1.jar");* >>>>>>>>> * val file1 = >>>>>>>>> sc.textFile("hdfs://localhost:54310/user/hduser/file1.csv");* >>>>>>>>> * val file2 = >>>>>>>>> sc.textFile("hdfs://localhost:54310/user/hduser/file2.csv");* >>>>>>>>> * val sq = new SQLContext(sc);* >>>>>>>>> * val file1_recs: RDD[Test] = file1.map(_.split(",")).map(l => >>>>>>>>> Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));* >>>>>>>>> * val file2_recs: RDD[Test] = file2.map(_.split(",")).map(s => >>>>>>>>> Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));* >>>>>>>>> * val file1_schema = sq.createSchemaRDD(file1_recs);* >>>>>>>>> * val file2_schema = sq.createSchemaRDD(file2_recs);* >>>>>>>>> * file1_schema.registerAsTable("file1_tab");* >>>>>>>>> * file2_schema.registerAsTable("file2_tab");* >>>>>>>>> * val matched = sq.sql("select * from file1_tab l join file2_tab >>>>>>>>> s on l.fld6=s.fld6 where l.fld3=s.fld3 and l.fld4=s.fld4 and >>>>>>>>> l.fld5=s.fld5 >>>>>>>>> and l.fld2=s.fld2");* >>>>>>>>> * val count = matched.count();* >>>>>>>>> * System.out.println("Found " + matched.count() + " matching >>>>>>>>> records");* >>>>>>>>> *}* >>>>>>>>> >>>>>>>>> When I run this program on a standalone spark cluster, it keeps >>>>>>>>> running for long with no output or error. After waiting for few mins >>>>>>>>> I'm >>>>>>>>> forcibly killing it. >>>>>>>>> But the same program is working well when executed from a spark >>>>>>>>> shell. >>>>>>>>> >>>>>>>>> What is going wrong? What am I missing? >>>>>>>>> >>>>>>>>> ~Sarath >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >