Hi Michael, Soumya, Can you please check and let me know what is the issue? what am I missing? Let me know if you need any logs to analyze.
~Sarath On Wed, Jul 16, 2014 at 8:24 PM, Sarath Chandra < sarathchandra.jos...@algofusiontech.com> wrote: > Hi Michael, > > Tried it. It's correctly printing the line counts of both the files. > Here's what I tried - > > *Code:* > *package test* > *object Test4 {* > * case class Test(fld1: String, * > * fld2: String, * > * fld3: String, * > * fld4: String, * > * fld5: String, * > * fld6: Double, * > * fld7: String);* > * def main(args: Array[String]) {* > * val conf = new SparkConf()* > * .setMaster(args(0))* > * .setAppName("SQLTest")* > * .setSparkHome(args(1))* > * .set("spark.executor.memory", "2g");* > * val sc = new SparkContext(conf);* > * sc.addJar("test1-0.1.jar");* > * val file1 = sc.textFile(args(2));* > * println(file1.count());* > * val file2 = sc.textFile(args(3));* > * println(file2.count());* > *// val sq = new SQLContext(sc);* > *// import sq._* > *// val file1_recs: RDD[Test] = file1.map(_.split(",")).map(l => > Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));* > *// val file2_recs: RDD[Test] = file2.map(_.split(",")).map(s => > Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));* > *// val file1_schema = sq.createSchemaRDD(file1_recs);* > *// val file2_schema = sq.createSchemaRDD(file2_recs);* > *// file1_schema.registerAsTable("file1_tab");* > *// file2_schema.registerAsTable("file2_tab");* > *// val matched = sq.sql("select * from file1_tab l join file2_tab s on > " + * > *// "l.fld7=s.fld7 where l.fld2=s.fld2 and " + * > *// "l.fld3=s.fld3 and l.fld4=s.fld4 and " + * > *// "l.fld6=s.fld6");* > *// matched.collect().foreach(println);* > * }* > *}* > > *Execution:* > *export CLASSPATH=$HADOOP_PREFIX/conf:$SPARK_HOME/lib/*:test1-0.1.jar* > *export CONFIG_OPTS="-Dspark.jars=test1-0.1.jar"* > *java -cp $CLASSPATH $CONFIG_OPTS test.Test4 spark://master:7077 > "/usr/local/spark-1.0.1-bin-hadoop1" > hdfs://master:54310/user/hduser/file1.csv > hdfs://master:54310/user/hduser/file2.csv* > > ~Sarath > > On Wed, Jul 16, 2014 at 8:14 PM, Michael Armbrust <mich...@databricks.com> > wrote: > >> What if you just run something like: >> *sc.textFile("hdfs://localhost:54310/user/hduser/file1.csv").count()* >> >> >> On Wed, Jul 16, 2014 at 10:37 AM, Sarath Chandra < >> sarathchandra.jos...@algofusiontech.com> wrote: >> >>> Yes Soumya, I did it. >>> >>> First I tried with the example available in the documentation (example >>> using people table and finding teenagers). After successfully running it, I >>> moved on to this one which is starting point to a bigger requirement for >>> which I'm evaluating Spark SQL. >>> >>> >>> On Wed, Jul 16, 2014 at 7:59 PM, Soumya Simanta < >>> soumya.sima...@gmail.com> wrote: >>> >>>> >>>> >>>> Can you try submitting a very simple job to the cluster. >>>> >>>> On Jul 16, 2014, at 10:25 AM, Sarath Chandra < >>>> sarathchandra.jos...@algofusiontech.com> wrote: >>>> >>>> Yes it is appearing on the Spark UI, and remains there with state as >>>> "RUNNING" till I press Ctrl+C in the terminal to kill the execution. >>>> >>>> Barring the statements to create the spark context, if I copy paste the >>>> lines of my code in spark shell, runs perfectly giving the desired output. >>>> >>>> ~Sarath >>>> >>>> On Wed, Jul 16, 2014 at 7:48 PM, Soumya Simanta < >>>> soumya.sima...@gmail.com> wrote: >>>> >>>>> When you submit your job, it should appear on the Spark UI. Same with >>>>> the REPL. Make sure you job is submitted to the cluster properly. >>>>> >>>>> >>>>> On Wed, Jul 16, 2014 at 10:08 AM, Sarath Chandra < >>>>> sarathchandra.jos...@algofusiontech.com> wrote: >>>>> >>>>>> Hi Soumya, >>>>>> >>>>>> Data is very small, 500+ lines in each file. >>>>>> >>>>>> Removed last 2 lines and placed this at the end >>>>>> "matched.collect().foreach(println);". Still no luck. It's been more than >>>>>> 5min, the execution is still running. >>>>>> >>>>>> Checked logs, nothing in stdout. In stderr I don't see anything going >>>>>> wrong, all are info messages. >>>>>> >>>>>> What else do I need check? >>>>>> >>>>>> ~Sarath >>>>>> >>>>>> On Wed, Jul 16, 2014 at 7:23 PM, Soumya Simanta < >>>>>> soumya.sima...@gmail.com> wrote: >>>>>> >>>>>>> Check your executor logs for the output or if your data is not big >>>>>>> collect it in the driver and print it. >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Jul 16, 2014, at 9:21 AM, Sarath Chandra < >>>>>>> sarathchandra.jos...@algofusiontech.com> wrote: >>>>>>> >>>>>>> Hi All, >>>>>>> >>>>>>> I'm trying to do a simple record matching between 2 files and wrote >>>>>>> following code - >>>>>>> >>>>>>> *import org.apache.spark.sql.SQLContext;* >>>>>>> *import org.apache.spark.rdd.RDD* >>>>>>> *object SqlTest {* >>>>>>> * case class Test(fld1:String, fld2:String, fld3:String, >>>>>>> fld4:String, fld4:String, fld5:Double, fld6:String);* >>>>>>> * sc.addJar("test1-0.1.jar");* >>>>>>> * val file1 = >>>>>>> sc.textFile("hdfs://localhost:54310/user/hduser/file1.csv");* >>>>>>> * val file2 = >>>>>>> sc.textFile("hdfs://localhost:54310/user/hduser/file2.csv");* >>>>>>> * val sq = new SQLContext(sc);* >>>>>>> * val file1_recs: RDD[Test] = file1.map(_.split(",")).map(l => >>>>>>> Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));* >>>>>>> * val file2_recs: RDD[Test] = file2.map(_.split(",")).map(s => >>>>>>> Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));* >>>>>>> * val file1_schema = sq.createSchemaRDD(file1_recs);* >>>>>>> * val file2_schema = sq.createSchemaRDD(file2_recs);* >>>>>>> * file1_schema.registerAsTable("file1_tab");* >>>>>>> * file2_schema.registerAsTable("file2_tab");* >>>>>>> * val matched = sq.sql("select * from file1_tab l join file2_tab s >>>>>>> on l.fld6=s.fld6 where l.fld3=s.fld3 and l.fld4=s.fld4 and l.fld5=s.fld5 >>>>>>> and l.fld2=s.fld2");* >>>>>>> * val count = matched.count();* >>>>>>> * System.out.println("Found " + matched.count() + " matching >>>>>>> records");* >>>>>>> *}* >>>>>>> >>>>>>> When I run this program on a standalone spark cluster, it keeps >>>>>>> running for long with no output or error. After waiting for few mins I'm >>>>>>> forcibly killing it. >>>>>>> But the same program is working well when executed from a spark >>>>>>> shell. >>>>>>> >>>>>>> What is going wrong? What am I missing? >>>>>>> >>>>>>> ~Sarath >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >