Re: Simple record matching using Spark SQL
Hi Sarath, I will try to reproduce the problem. Thanks, Yin On Wed, Jul 23, 2014 at 11:32 PM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi Michael, Sorry for the delayed response. I'm using Spark 1.0.1 (pre-built version for hadoop 1). I'm running spark programs on a standalone spark cluster using 2 nodes. One node works as both master and worker while other node is just a worker. I quite didn't get when you asked for jstack of the driver and executor. So I'm attaching the log files generated in $SPARK_HOME/logs and stdout and stderr files for this job in $SPARK_HOME/work folder from both the nodes. Also attaching the program which I executed. If I uncomment the lines 36 37 the program works fine, otherwise it just keeps running forever. ~Sarath. On Thu, Jul 17, 2014 at 9:35 PM, Michael Armbrust mich...@databricks.com wrote: What version are you running? Could you provide a jstack of the driver and executor when it is hanging? On Thu, Jul 17, 2014 at 10:55 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Added below 2 lines just before the sql query line - *...* *file1_schema.count;* *file2_schema.count;* *...* and it started working. But I couldn't get the reason. Can someone please explain me? What was happening earlier and what is happening with addition of these 2 lines? ~Sarath On Thu, Jul 17, 2014 at 1:13 PM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: No Sonal, I'm not doing any explicit call to stop context. If you see my previous post to Michael, the commented portion of the code is my requirement. When I run this over standalone spark cluster, the execution keeps running with no output or error. After waiting for several minutes I'm killing it by pressing Ctrl+C in the terminal. But the same code runs perfectly when executed from spark shell. ~Sarath On Thu, Jul 17, 2014 at 1:05 PM, Sonal Goyal sonalgoy...@gmail.com wrote: Hi Sarath, Are you explicitly stopping the context? sc.stop() Best Regards, Sonal Nube Technologies http://www.nubetech.co http://in.linkedin.com/in/sonalgoyal On Thu, Jul 17, 2014 at 12:51 PM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi Michael, Soumya, Can you please check and let me know what is the issue? what am I missing? Let me know if you need any logs to analyze. ~Sarath On Wed, Jul 16, 2014 at 8:24 PM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi Michael, Tried it. It's correctly printing the line counts of both the files. Here's what I tried - *Code:* *package test* *object Test4 {* * case class Test(fld1: String, * * fld2: String, * * fld3: String, * * fld4: String, * * fld5: String, * * fld6: Double, * * fld7: String);* * def main(args: Array[String]) {* *val conf = new SparkConf()* *.setMaster(args(0))* * .setAppName(SQLTest)* * .setSparkHome(args(1))* * .set(spark.executor.memory, 2g);* *val sc = new SparkContext(conf);* *sc.addJar(test1-0.1.jar);* *val file1 = sc.textFile(args(2));* *println(file1.count());* *val file2 = sc.textFile(args(3));* *println(file2.count());* *//val sq = new SQLContext(sc);* *//import sq._* *//val file1_recs: RDD[Test] = file1.map(_.split(,)).map(l = Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));* *//val file2_recs: RDD[Test] = file2.map(_.split(,)).map(s = Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));* *//val file1_schema = sq.createSchemaRDD(file1_recs);* *//val file2_schema = sq.createSchemaRDD(file2_recs);* *//file1_schema.registerAsTable(file1_tab);* *//file2_schema.registerAsTable(file2_tab);* *//val matched = sq.sql(select * from file1_tab l join file2_tab s on + * *// l.fld7=s.fld7 where l.fld2=s.fld2 and + * *// l.fld3=s.fld3 and l.fld4=s.fld4 and + * *// l.fld6=s.fld6);* *//matched.collect().foreach(println);* * }* *}* *Execution:* *export CLASSPATH=$HADOOP_PREFIX/conf:$SPARK_HOME/lib/*:test1-0.1.jar* *export CONFIG_OPTS=-Dspark.jars=test1-0.1.jar* *java -cp $CLASSPATH $CONFIG_OPTS test.Test4 spark://master:7077 /usr/local/spark-1.0.1-bin-hadoop1 hdfs://master:54310/user/hduser/file1.csv hdfs://master:54310/user/hduser/file2.csv* ~Sarath On Wed, Jul 16, 2014 at 8:14 PM, Michael Armbrust mich...@databricks.com wrote: What if you just run something like: *sc.textFile(hdfs://localhost:54310/user/hduser/file1.csv).count()* On Wed, Jul 16, 2014 at 10:37 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Yes Soumya, I did it. First I tried with the example available in the documentation (example using people table and finding teenagers). After successfully running it, I moved on to this one which is starting point to a bigger requirement for which I'm evaluating Spark SQL. On Wed, Jul 16, 2014 at 7:59 PM, Soumya Simanta
Re: Simple record matching using Spark SQL
Hi Sarath, Have you tried the current branch 1.0? If not, can you give it a try and see if the problem can be resolved? Thanks, Yin On Thu, Jul 24, 2014 at 11:17 AM, Yin Huai yh...@databricks.com wrote: Hi Sarath, I will try to reproduce the problem. Thanks, Yin On Wed, Jul 23, 2014 at 11:32 PM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi Michael, Sorry for the delayed response. I'm using Spark 1.0.1 (pre-built version for hadoop 1). I'm running spark programs on a standalone spark cluster using 2 nodes. One node works as both master and worker while other node is just a worker. I quite didn't get when you asked for jstack of the driver and executor. So I'm attaching the log files generated in $SPARK_HOME/logs and stdout and stderr files for this job in $SPARK_HOME/work folder from both the nodes. Also attaching the program which I executed. If I uncomment the lines 36 37 the program works fine, otherwise it just keeps running forever. ~Sarath. On Thu, Jul 17, 2014 at 9:35 PM, Michael Armbrust mich...@databricks.com wrote: What version are you running? Could you provide a jstack of the driver and executor when it is hanging? On Thu, Jul 17, 2014 at 10:55 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Added below 2 lines just before the sql query line - *...* *file1_schema.count;* *file2_schema.count;* *...* and it started working. But I couldn't get the reason. Can someone please explain me? What was happening earlier and what is happening with addition of these 2 lines? ~Sarath On Thu, Jul 17, 2014 at 1:13 PM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: No Sonal, I'm not doing any explicit call to stop context. If you see my previous post to Michael, the commented portion of the code is my requirement. When I run this over standalone spark cluster, the execution keeps running with no output or error. After waiting for several minutes I'm killing it by pressing Ctrl+C in the terminal. But the same code runs perfectly when executed from spark shell. ~Sarath On Thu, Jul 17, 2014 at 1:05 PM, Sonal Goyal sonalgoy...@gmail.com wrote: Hi Sarath, Are you explicitly stopping the context? sc.stop() Best Regards, Sonal Nube Technologies http://www.nubetech.co http://in.linkedin.com/in/sonalgoyal On Thu, Jul 17, 2014 at 12:51 PM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi Michael, Soumya, Can you please check and let me know what is the issue? what am I missing? Let me know if you need any logs to analyze. ~Sarath On Wed, Jul 16, 2014 at 8:24 PM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi Michael, Tried it. It's correctly printing the line counts of both the files. Here's what I tried - *Code:* *package test* *object Test4 {* * case class Test(fld1: String, * * fld2: String, * * fld3: String, * * fld4: String, * * fld5: String, * * fld6: Double, * * fld7: String);* * def main(args: Array[String]) {* *val conf = new SparkConf()* *.setMaster(args(0))* * .setAppName(SQLTest)* * .setSparkHome(args(1))* * .set(spark.executor.memory, 2g);* *val sc = new SparkContext(conf);* *sc.addJar(test1-0.1.jar);* *val file1 = sc.textFile(args(2));* *println(file1.count());* *val file2 = sc.textFile(args(3));* *println(file2.count());* *//val sq = new SQLContext(sc);* *//import sq._* *//val file1_recs: RDD[Test] = file1.map(_.split(,)).map(l = Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));* *//val file2_recs: RDD[Test] = file2.map(_.split(,)).map(s = Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));* *//val file1_schema = sq.createSchemaRDD(file1_recs);* *//val file2_schema = sq.createSchemaRDD(file2_recs);* *//file1_schema.registerAsTable(file1_tab);* *//file2_schema.registerAsTable(file2_tab);* *//val matched = sq.sql(select * from file1_tab l join file2_tab s on + * *// l.fld7=s.fld7 where l.fld2=s.fld2 and + * *// l.fld3=s.fld3 and l.fld4=s.fld4 and + * *// l.fld6=s.fld6);* *//matched.collect().foreach(println);* * }* *}* *Execution:* *export CLASSPATH=$HADOOP_PREFIX/conf:$SPARK_HOME/lib/*:test1-0.1.jar* *export CONFIG_OPTS=-Dspark.jars=test1-0.1.jar* *java -cp $CLASSPATH $CONFIG_OPTS test.Test4 spark://master:7077 /usr/local/spark-1.0.1-bin-hadoop1 hdfs://master:54310/user/hduser/file1.csv hdfs://master:54310/user/hduser/file2.csv* ~Sarath On Wed, Jul 16, 2014 at 8:14 PM, Michael Armbrust mich...@databricks.com wrote: What if you just run something like: *sc.textFile(hdfs://localhost:54310/user/hduser/file1.csv).count()* On Wed, Jul 16, 2014 at 10:37 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Yes Soumya, I did it. First I tried with the example available in the documentation (example using people
Re: Simple record matching using Spark SQL
Added below 2 lines just before the sql query line - *...* *file1_schema.count;* *file2_schema.count;* *...* and it started working. But I couldn't get the reason. Can someone please explain me? What was happening earlier and what is happening with addition of these 2 lines? ~Sarath On Thu, Jul 17, 2014 at 1:13 PM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: No Sonal, I'm not doing any explicit call to stop context. If you see my previous post to Michael, the commented portion of the code is my requirement. When I run this over standalone spark cluster, the execution keeps running with no output or error. After waiting for several minutes I'm killing it by pressing Ctrl+C in the terminal. But the same code runs perfectly when executed from spark shell. ~Sarath On Thu, Jul 17, 2014 at 1:05 PM, Sonal Goyal sonalgoy...@gmail.com wrote: Hi Sarath, Are you explicitly stopping the context? sc.stop() Best Regards, Sonal Nube Technologies http://www.nubetech.co http://in.linkedin.com/in/sonalgoyal On Thu, Jul 17, 2014 at 12:51 PM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi Michael, Soumya, Can you please check and let me know what is the issue? what am I missing? Let me know if you need any logs to analyze. ~Sarath On Wed, Jul 16, 2014 at 8:24 PM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi Michael, Tried it. It's correctly printing the line counts of both the files. Here's what I tried - *Code:* *package test* *object Test4 {* * case class Test(fld1: String, * * fld2: String, * * fld3: String, * * fld4: String, * * fld5: String, * * fld6: Double, * * fld7: String);* * def main(args: Array[String]) {* *val conf = new SparkConf()* *.setMaster(args(0))* * .setAppName(SQLTest)* * .setSparkHome(args(1))* * .set(spark.executor.memory, 2g);* *val sc = new SparkContext(conf);* *sc.addJar(test1-0.1.jar);* *val file1 = sc.textFile(args(2));* *println(file1.count());* *val file2 = sc.textFile(args(3));* *println(file2.count());* *//val sq = new SQLContext(sc);* *//import sq._* *//val file1_recs: RDD[Test] = file1.map(_.split(,)).map(l = Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));* *//val file2_recs: RDD[Test] = file2.map(_.split(,)).map(s = Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));* *//val file1_schema = sq.createSchemaRDD(file1_recs);* *//val file2_schema = sq.createSchemaRDD(file2_recs);* *//file1_schema.registerAsTable(file1_tab);* *//file2_schema.registerAsTable(file2_tab);* *//val matched = sq.sql(select * from file1_tab l join file2_tab s on + * *// l.fld7=s.fld7 where l.fld2=s.fld2 and + * *// l.fld3=s.fld3 and l.fld4=s.fld4 and + * *// l.fld6=s.fld6);* *//matched.collect().foreach(println);* * }* *}* *Execution:* *export CLASSPATH=$HADOOP_PREFIX/conf:$SPARK_HOME/lib/*:test1-0.1.jar* *export CONFIG_OPTS=-Dspark.jars=test1-0.1.jar* *java -cp $CLASSPATH $CONFIG_OPTS test.Test4 spark://master:7077 /usr/local/spark-1.0.1-bin-hadoop1 hdfs://master:54310/user/hduser/file1.csv hdfs://master:54310/user/hduser/file2.csv* ~Sarath On Wed, Jul 16, 2014 at 8:14 PM, Michael Armbrust mich...@databricks.com wrote: What if you just run something like: *sc.textFile(hdfs://localhost:54310/user/hduser/file1.csv).count()* On Wed, Jul 16, 2014 at 10:37 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Yes Soumya, I did it. First I tried with the example available in the documentation (example using people table and finding teenagers). After successfully running it, I moved on to this one which is starting point to a bigger requirement for which I'm evaluating Spark SQL. On Wed, Jul 16, 2014 at 7:59 PM, Soumya Simanta soumya.sima...@gmail.com wrote: Can you try submitting a very simple job to the cluster. On Jul 16, 2014, at 10:25 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Yes it is appearing on the Spark UI, and remains there with state as RUNNING till I press Ctrl+C in the terminal to kill the execution. Barring the statements to create the spark context, if I copy paste the lines of my code in spark shell, runs perfectly giving the desired output. ~Sarath On Wed, Jul 16, 2014 at 7:48 PM, Soumya Simanta soumya.sima...@gmail.com wrote: When you submit your job, it should appear on the Spark UI. Same with the REPL. Make sure you job is submitted to the cluster properly. On Wed, Jul 16, 2014 at 10:08 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi Soumya, Data is very small, 500+ lines in each file. Removed last 2 lines and placed this at the end matched.collect().foreach(println);. Still no luck. It's been more than 5min, the execution is still running. Checked logs, nothing in stdout. In stderr I don't see anything
Re: Simple record matching using Spark SQL
What version are you running? Could you provide a jstack of the driver and executor when it is hanging? On Thu, Jul 17, 2014 at 10:55 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Added below 2 lines just before the sql query line - *...* *file1_schema.count;* *file2_schema.count;* *...* and it started working. But I couldn't get the reason. Can someone please explain me? What was happening earlier and what is happening with addition of these 2 lines? ~Sarath On Thu, Jul 17, 2014 at 1:13 PM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: No Sonal, I'm not doing any explicit call to stop context. If you see my previous post to Michael, the commented portion of the code is my requirement. When I run this over standalone spark cluster, the execution keeps running with no output or error. After waiting for several minutes I'm killing it by pressing Ctrl+C in the terminal. But the same code runs perfectly when executed from spark shell. ~Sarath On Thu, Jul 17, 2014 at 1:05 PM, Sonal Goyal sonalgoy...@gmail.com wrote: Hi Sarath, Are you explicitly stopping the context? sc.stop() Best Regards, Sonal Nube Technologies http://www.nubetech.co http://in.linkedin.com/in/sonalgoyal On Thu, Jul 17, 2014 at 12:51 PM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi Michael, Soumya, Can you please check and let me know what is the issue? what am I missing? Let me know if you need any logs to analyze. ~Sarath On Wed, Jul 16, 2014 at 8:24 PM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi Michael, Tried it. It's correctly printing the line counts of both the files. Here's what I tried - *Code:* *package test* *object Test4 {* * case class Test(fld1: String, * * fld2: String, * * fld3: String, * * fld4: String, * * fld5: String, * * fld6: Double, * * fld7: String);* * def main(args: Array[String]) {* *val conf = new SparkConf()* *.setMaster(args(0))* * .setAppName(SQLTest)* * .setSparkHome(args(1))* * .set(spark.executor.memory, 2g);* *val sc = new SparkContext(conf);* *sc.addJar(test1-0.1.jar);* *val file1 = sc.textFile(args(2));* *println(file1.count());* *val file2 = sc.textFile(args(3));* *println(file2.count());* *//val sq = new SQLContext(sc);* *//import sq._* *//val file1_recs: RDD[Test] = file1.map(_.split(,)).map(l = Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));* *//val file2_recs: RDD[Test] = file2.map(_.split(,)).map(s = Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));* *//val file1_schema = sq.createSchemaRDD(file1_recs);* *//val file2_schema = sq.createSchemaRDD(file2_recs);* *//file1_schema.registerAsTable(file1_tab);* *//file2_schema.registerAsTable(file2_tab);* *//val matched = sq.sql(select * from file1_tab l join file2_tab s on + * *// l.fld7=s.fld7 where l.fld2=s.fld2 and + * *// l.fld3=s.fld3 and l.fld4=s.fld4 and + * *// l.fld6=s.fld6);* *//matched.collect().foreach(println);* * }* *}* *Execution:* *export CLASSPATH=$HADOOP_PREFIX/conf:$SPARK_HOME/lib/*:test1-0.1.jar* *export CONFIG_OPTS=-Dspark.jars=test1-0.1.jar* *java -cp $CLASSPATH $CONFIG_OPTS test.Test4 spark://master:7077 /usr/local/spark-1.0.1-bin-hadoop1 hdfs://master:54310/user/hduser/file1.csv hdfs://master:54310/user/hduser/file2.csv* ~Sarath On Wed, Jul 16, 2014 at 8:14 PM, Michael Armbrust mich...@databricks.com wrote: What if you just run something like: *sc.textFile(hdfs://localhost:54310/user/hduser/file1.csv).count()* On Wed, Jul 16, 2014 at 10:37 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Yes Soumya, I did it. First I tried with the example available in the documentation (example using people table and finding teenagers). After successfully running it, I moved on to this one which is starting point to a bigger requirement for which I'm evaluating Spark SQL. On Wed, Jul 16, 2014 at 7:59 PM, Soumya Simanta soumya.sima...@gmail.com wrote: Can you try submitting a very simple job to the cluster. On Jul 16, 2014, at 10:25 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Yes it is appearing on the Spark UI, and remains there with state as RUNNING till I press Ctrl+C in the terminal to kill the execution. Barring the statements to create the spark context, if I copy paste the lines of my code in spark shell, runs perfectly giving the desired output. ~Sarath On Wed, Jul 16, 2014 at 7:48 PM, Soumya Simanta soumya.sima...@gmail.com wrote: When you submit your job, it should appear on the Spark UI. Same with the REPL. Make sure you job is submitted to the cluster properly. On Wed, Jul 16, 2014 at 10:08 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi Soumya, Data is very small, 500+ lines in each file. Removed last
Re: Simple record matching using Spark SQL
Check your executor logs for the output or if your data is not big collect it in the driver and print it. On Jul 16, 2014, at 9:21 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi All, I'm trying to do a simple record matching between 2 files and wrote following code - import org.apache.spark.sql.SQLContext; import org.apache.spark.rdd.RDD object SqlTest { case class Test(fld1:String, fld2:String, fld3:String, fld4:String, fld4:String, fld5:Double, fld6:String); sc.addJar(test1-0.1.jar); val file1 = sc.textFile(hdfs://localhost:54310/user/hduser/file1.csv); val file2 = sc.textFile(hdfs://localhost:54310/user/hduser/file2.csv); val sq = new SQLContext(sc); val file1_recs: RDD[Test] = file1.map(_.split(,)).map(l = Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6))); val file2_recs: RDD[Test] = file2.map(_.split(,)).map(s = Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6))); val file1_schema = sq.createSchemaRDD(file1_recs); val file2_schema = sq.createSchemaRDD(file2_recs); file1_schema.registerAsTable(file1_tab); file2_schema.registerAsTable(file2_tab); val matched = sq.sql(select * from file1_tab l join file2_tab s on l.fld6=s.fld6 where l.fld3=s.fld3 and l.fld4=s.fld4 and l.fld5=s.fld5 and l.fld2=s.fld2); val count = matched.count(); System.out.println(Found + matched.count() + matching records); } When I run this program on a standalone spark cluster, it keeps running for long with no output or error. After waiting for few mins I'm forcibly killing it. But the same program is working well when executed from a spark shell. What is going wrong? What am I missing? ~Sarath
Re: Simple record matching using Spark SQL
Hi Soumya, Data is very small, 500+ lines in each file. Removed last 2 lines and placed this at the end matched.collect().foreach(println);. Still no luck. It's been more than 5min, the execution is still running. Checked logs, nothing in stdout. In stderr I don't see anything going wrong, all are info messages. What else do I need check? ~Sarath On Wed, Jul 16, 2014 at 7:23 PM, Soumya Simanta soumya.sima...@gmail.com wrote: Check your executor logs for the output or if your data is not big collect it in the driver and print it. On Jul 16, 2014, at 9:21 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi All, I'm trying to do a simple record matching between 2 files and wrote following code - *import org.apache.spark.sql.SQLContext;* *import org.apache.spark.rdd.RDD* *object SqlTest {* * case class Test(fld1:String, fld2:String, fld3:String, fld4:String, fld4:String, fld5:Double, fld6:String);* * sc.addJar(test1-0.1.jar);* * val file1 = sc.textFile(hdfs://localhost:54310/user/hduser/file1.csv);* * val file2 = sc.textFile(hdfs://localhost:54310/user/hduser/file2.csv);* * val sq = new SQLContext(sc);* * val file1_recs: RDD[Test] = file1.map(_.split(,)).map(l = Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));* * val file2_recs: RDD[Test] = file2.map(_.split(,)).map(s = Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));* * val file1_schema = sq.createSchemaRDD(file1_recs);* * val file2_schema = sq.createSchemaRDD(file2_recs);* * file1_schema.registerAsTable(file1_tab);* * file2_schema.registerAsTable(file2_tab);* * val matched = sq.sql(select * from file1_tab l join file2_tab s on l.fld6=s.fld6 where l.fld3=s.fld3 and l.fld4=s.fld4 and l.fld5=s.fld5 and l.fld2=s.fld2);* * val count = matched.count();* * System.out.println(Found + matched.count() + matching records);* *}* When I run this program on a standalone spark cluster, it keeps running for long with no output or error. After waiting for few mins I'm forcibly killing it. But the same program is working well when executed from a spark shell. What is going wrong? What am I missing? ~Sarath
Re: Simple record matching using Spark SQL
When you submit your job, it should appear on the Spark UI. Same with the REPL. Make sure you job is submitted to the cluster properly. On Wed, Jul 16, 2014 at 10:08 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi Soumya, Data is very small, 500+ lines in each file. Removed last 2 lines and placed this at the end matched.collect().foreach(println);. Still no luck. It's been more than 5min, the execution is still running. Checked logs, nothing in stdout. In stderr I don't see anything going wrong, all are info messages. What else do I need check? ~Sarath On Wed, Jul 16, 2014 at 7:23 PM, Soumya Simanta soumya.sima...@gmail.com wrote: Check your executor logs for the output or if your data is not big collect it in the driver and print it. On Jul 16, 2014, at 9:21 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi All, I'm trying to do a simple record matching between 2 files and wrote following code - *import org.apache.spark.sql.SQLContext;* *import org.apache.spark.rdd.RDD* *object SqlTest {* * case class Test(fld1:String, fld2:String, fld3:String, fld4:String, fld4:String, fld5:Double, fld6:String);* * sc.addJar(test1-0.1.jar);* * val file1 = sc.textFile(hdfs://localhost:54310/user/hduser/file1.csv);* * val file2 = sc.textFile(hdfs://localhost:54310/user/hduser/file2.csv);* * val sq = new SQLContext(sc);* * val file1_recs: RDD[Test] = file1.map(_.split(,)).map(l = Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));* * val file2_recs: RDD[Test] = file2.map(_.split(,)).map(s = Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));* * val file1_schema = sq.createSchemaRDD(file1_recs);* * val file2_schema = sq.createSchemaRDD(file2_recs);* * file1_schema.registerAsTable(file1_tab);* * file2_schema.registerAsTable(file2_tab);* * val matched = sq.sql(select * from file1_tab l join file2_tab s on l.fld6=s.fld6 where l.fld3=s.fld3 and l.fld4=s.fld4 and l.fld5=s.fld5 and l.fld2=s.fld2);* * val count = matched.count();* * System.out.println(Found + matched.count() + matching records);* *}* When I run this program on a standalone spark cluster, it keeps running for long with no output or error. After waiting for few mins I'm forcibly killing it. But the same program is working well when executed from a spark shell. What is going wrong? What am I missing? ~Sarath
Re: Simple record matching using Spark SQL
Yes it is appearing on the Spark UI, and remains there with state as RUNNING till I press Ctrl+C in the terminal to kill the execution. Barring the statements to create the spark context, if I copy paste the lines of my code in spark shell, runs perfectly giving the desired output. ~Sarath On Wed, Jul 16, 2014 at 7:48 PM, Soumya Simanta soumya.sima...@gmail.com wrote: When you submit your job, it should appear on the Spark UI. Same with the REPL. Make sure you job is submitted to the cluster properly. On Wed, Jul 16, 2014 at 10:08 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi Soumya, Data is very small, 500+ lines in each file. Removed last 2 lines and placed this at the end matched.collect().foreach(println);. Still no luck. It's been more than 5min, the execution is still running. Checked logs, nothing in stdout. In stderr I don't see anything going wrong, all are info messages. What else do I need check? ~Sarath On Wed, Jul 16, 2014 at 7:23 PM, Soumya Simanta soumya.sima...@gmail.com wrote: Check your executor logs for the output or if your data is not big collect it in the driver and print it. On Jul 16, 2014, at 9:21 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi All, I'm trying to do a simple record matching between 2 files and wrote following code - *import org.apache.spark.sql.SQLContext;* *import org.apache.spark.rdd.RDD* *object SqlTest {* * case class Test(fld1:String, fld2:String, fld3:String, fld4:String, fld4:String, fld5:Double, fld6:String);* * sc.addJar(test1-0.1.jar);* * val file1 = sc.textFile(hdfs://localhost:54310/user/hduser/file1.csv);* * val file2 = sc.textFile(hdfs://localhost:54310/user/hduser/file2.csv);* * val sq = new SQLContext(sc);* * val file1_recs: RDD[Test] = file1.map(_.split(,)).map(l = Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));* * val file2_recs: RDD[Test] = file2.map(_.split(,)).map(s = Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));* * val file1_schema = sq.createSchemaRDD(file1_recs);* * val file2_schema = sq.createSchemaRDD(file2_recs);* * file1_schema.registerAsTable(file1_tab);* * file2_schema.registerAsTable(file2_tab);* * val matched = sq.sql(select * from file1_tab l join file2_tab s on l.fld6=s.fld6 where l.fld3=s.fld3 and l.fld4=s.fld4 and l.fld5=s.fld5 and l.fld2=s.fld2);* * val count = matched.count();* * System.out.println(Found + matched.count() + matching records);* *}* When I run this program on a standalone spark cluster, it keeps running for long with no output or error. After waiting for few mins I'm forcibly killing it. But the same program is working well when executed from a spark shell. What is going wrong? What am I missing? ~Sarath
Re: Simple record matching using Spark SQL
Can you try submitting a very simple job to the cluster. On Jul 16, 2014, at 10:25 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Yes it is appearing on the Spark UI, and remains there with state as RUNNING till I press Ctrl+C in the terminal to kill the execution. Barring the statements to create the spark context, if I copy paste the lines of my code in spark shell, runs perfectly giving the desired output. ~Sarath On Wed, Jul 16, 2014 at 7:48 PM, Soumya Simanta soumya.sima...@gmail.com wrote: When you submit your job, it should appear on the Spark UI. Same with the REPL. Make sure you job is submitted to the cluster properly. On Wed, Jul 16, 2014 at 10:08 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi Soumya, Data is very small, 500+ lines in each file. Removed last 2 lines and placed this at the end matched.collect().foreach(println);. Still no luck. It's been more than 5min, the execution is still running. Checked logs, nothing in stdout. In stderr I don't see anything going wrong, all are info messages. What else do I need check? ~Sarath On Wed, Jul 16, 2014 at 7:23 PM, Soumya Simanta soumya.sima...@gmail.com wrote: Check your executor logs for the output or if your data is not big collect it in the driver and print it. On Jul 16, 2014, at 9:21 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi All, I'm trying to do a simple record matching between 2 files and wrote following code - import org.apache.spark.sql.SQLContext; import org.apache.spark.rdd.RDD object SqlTest { case class Test(fld1:String, fld2:String, fld3:String, fld4:String, fld4:String, fld5:Double, fld6:String); sc.addJar(test1-0.1.jar); val file1 = sc.textFile(hdfs://localhost:54310/user/hduser/file1.csv); val file2 = sc.textFile(hdfs://localhost:54310/user/hduser/file2.csv); val sq = new SQLContext(sc); val file1_recs: RDD[Test] = file1.map(_.split(,)).map(l = Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6))); val file2_recs: RDD[Test] = file2.map(_.split(,)).map(s = Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6))); val file1_schema = sq.createSchemaRDD(file1_recs); val file2_schema = sq.createSchemaRDD(file2_recs); file1_schema.registerAsTable(file1_tab); file2_schema.registerAsTable(file2_tab); val matched = sq.sql(select * from file1_tab l join file2_tab s on l.fld6=s.fld6 where l.fld3=s.fld3 and l.fld4=s.fld4 and l.fld5=s.fld5 and l.fld2=s.fld2); val count = matched.count(); System.out.println(Found + matched.count() + matching records); } When I run this program on a standalone spark cluster, it keeps running for long with no output or error. After waiting for few mins I'm forcibly killing it. But the same program is working well when executed from a spark shell. What is going wrong? What am I missing? ~Sarath
Re: Simple record matching using Spark SQL
Yes Soumya, I did it. First I tried with the example available in the documentation (example using people table and finding teenagers). After successfully running it, I moved on to this one which is starting point to a bigger requirement for which I'm evaluating Spark SQL. On Wed, Jul 16, 2014 at 7:59 PM, Soumya Simanta soumya.sima...@gmail.com wrote: Can you try submitting a very simple job to the cluster. On Jul 16, 2014, at 10:25 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Yes it is appearing on the Spark UI, and remains there with state as RUNNING till I press Ctrl+C in the terminal to kill the execution. Barring the statements to create the spark context, if I copy paste the lines of my code in spark shell, runs perfectly giving the desired output. ~Sarath On Wed, Jul 16, 2014 at 7:48 PM, Soumya Simanta soumya.sima...@gmail.com wrote: When you submit your job, it should appear on the Spark UI. Same with the REPL. Make sure you job is submitted to the cluster properly. On Wed, Jul 16, 2014 at 10:08 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi Soumya, Data is very small, 500+ lines in each file. Removed last 2 lines and placed this at the end matched.collect().foreach(println);. Still no luck. It's been more than 5min, the execution is still running. Checked logs, nothing in stdout. In stderr I don't see anything going wrong, all are info messages. What else do I need check? ~Sarath On Wed, Jul 16, 2014 at 7:23 PM, Soumya Simanta soumya.sima...@gmail.com wrote: Check your executor logs for the output or if your data is not big collect it in the driver and print it. On Jul 16, 2014, at 9:21 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi All, I'm trying to do a simple record matching between 2 files and wrote following code - *import org.apache.spark.sql.SQLContext;* *import org.apache.spark.rdd.RDD* *object SqlTest {* * case class Test(fld1:String, fld2:String, fld3:String, fld4:String, fld4:String, fld5:Double, fld6:String);* * sc.addJar(test1-0.1.jar);* * val file1 = sc.textFile(hdfs://localhost:54310/user/hduser/file1.csv);* * val file2 = sc.textFile(hdfs://localhost:54310/user/hduser/file2.csv);* * val sq = new SQLContext(sc);* * val file1_recs: RDD[Test] = file1.map(_.split(,)).map(l = Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));* * val file2_recs: RDD[Test] = file2.map(_.split(,)).map(s = Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));* * val file1_schema = sq.createSchemaRDD(file1_recs);* * val file2_schema = sq.createSchemaRDD(file2_recs);* * file1_schema.registerAsTable(file1_tab);* * file2_schema.registerAsTable(file2_tab);* * val matched = sq.sql(select * from file1_tab l join file2_tab s on l.fld6=s.fld6 where l.fld3=s.fld3 and l.fld4=s.fld4 and l.fld5=s.fld5 and l.fld2=s.fld2);* * val count = matched.count();* * System.out.println(Found + matched.count() + matching records);* *}* When I run this program on a standalone spark cluster, it keeps running for long with no output or error. After waiting for few mins I'm forcibly killing it. But the same program is working well when executed from a spark shell. What is going wrong? What am I missing? ~Sarath
Re: Simple record matching using Spark SQL
What if you just run something like: *sc.textFile(hdfs://localhost:54310/user/hduser/file1.csv).count()* On Wed, Jul 16, 2014 at 10:37 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Yes Soumya, I did it. First I tried with the example available in the documentation (example using people table and finding teenagers). After successfully running it, I moved on to this one which is starting point to a bigger requirement for which I'm evaluating Spark SQL. On Wed, Jul 16, 2014 at 7:59 PM, Soumya Simanta soumya.sima...@gmail.com wrote: Can you try submitting a very simple job to the cluster. On Jul 16, 2014, at 10:25 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Yes it is appearing on the Spark UI, and remains there with state as RUNNING till I press Ctrl+C in the terminal to kill the execution. Barring the statements to create the spark context, if I copy paste the lines of my code in spark shell, runs perfectly giving the desired output. ~Sarath On Wed, Jul 16, 2014 at 7:48 PM, Soumya Simanta soumya.sima...@gmail.com wrote: When you submit your job, it should appear on the Spark UI. Same with the REPL. Make sure you job is submitted to the cluster properly. On Wed, Jul 16, 2014 at 10:08 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi Soumya, Data is very small, 500+ lines in each file. Removed last 2 lines and placed this at the end matched.collect().foreach(println);. Still no luck. It's been more than 5min, the execution is still running. Checked logs, nothing in stdout. In stderr I don't see anything going wrong, all are info messages. What else do I need check? ~Sarath On Wed, Jul 16, 2014 at 7:23 PM, Soumya Simanta soumya.sima...@gmail.com wrote: Check your executor logs for the output or if your data is not big collect it in the driver and print it. On Jul 16, 2014, at 9:21 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi All, I'm trying to do a simple record matching between 2 files and wrote following code - *import org.apache.spark.sql.SQLContext;* *import org.apache.spark.rdd.RDD* *object SqlTest {* * case class Test(fld1:String, fld2:String, fld3:String, fld4:String, fld4:String, fld5:Double, fld6:String);* * sc.addJar(test1-0.1.jar);* * val file1 = sc.textFile(hdfs://localhost:54310/user/hduser/file1.csv);* * val file2 = sc.textFile(hdfs://localhost:54310/user/hduser/file2.csv);* * val sq = new SQLContext(sc);* * val file1_recs: RDD[Test] = file1.map(_.split(,)).map(l = Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));* * val file2_recs: RDD[Test] = file2.map(_.split(,)).map(s = Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));* * val file1_schema = sq.createSchemaRDD(file1_recs);* * val file2_schema = sq.createSchemaRDD(file2_recs);* * file1_schema.registerAsTable(file1_tab);* * file2_schema.registerAsTable(file2_tab);* * val matched = sq.sql(select * from file1_tab l join file2_tab s on l.fld6=s.fld6 where l.fld3=s.fld3 and l.fld4=s.fld4 and l.fld5=s.fld5 and l.fld2=s.fld2);* * val count = matched.count();* * System.out.println(Found + matched.count() + matching records);* *}* When I run this program on a standalone spark cluster, it keeps running for long with no output or error. After waiting for few mins I'm forcibly killing it. But the same program is working well when executed from a spark shell. What is going wrong? What am I missing? ~Sarath
Re: Simple record matching using Spark SQL
Hi Michael, Tried it. It's correctly printing the line counts of both the files. Here's what I tried - *Code:* *package test* *object Test4 {* * case class Test(fld1: String, * * fld2: String, * * fld3: String, * * fld4: String, * * fld5: String, * * fld6: Double, * * fld7: String);* * def main(args: Array[String]) {* *val conf = new SparkConf()* *.setMaster(args(0))* * .setAppName(SQLTest)* * .setSparkHome(args(1))* * .set(spark.executor.memory, 2g);* *val sc = new SparkContext(conf);* *sc.addJar(test1-0.1.jar);* *val file1 = sc.textFile(args(2));* *println(file1.count());* *val file2 = sc.textFile(args(3));* *println(file2.count());* *//val sq = new SQLContext(sc);* *//import sq._* *//val file1_recs: RDD[Test] = file1.map(_.split(,)).map(l = Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));* *//val file2_recs: RDD[Test] = file2.map(_.split(,)).map(s = Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));* *//val file1_schema = sq.createSchemaRDD(file1_recs);* *//val file2_schema = sq.createSchemaRDD(file2_recs);* *//file1_schema.registerAsTable(file1_tab);* *//file2_schema.registerAsTable(file2_tab);* *//val matched = sq.sql(select * from file1_tab l join file2_tab s on + * *// l.fld7=s.fld7 where l.fld2=s.fld2 and + * *// l.fld3=s.fld3 and l.fld4=s.fld4 and + * *// l.fld6=s.fld6);* *//matched.collect().foreach(println);* * }* *}* *Execution:* *export CLASSPATH=$HADOOP_PREFIX/conf:$SPARK_HOME/lib/*:test1-0.1.jar* *export CONFIG_OPTS=-Dspark.jars=test1-0.1.jar* *java -cp $CLASSPATH $CONFIG_OPTS test.Test4 spark://master:7077 /usr/local/spark-1.0.1-bin-hadoop1 hdfs://master:54310/user/hduser/file1.csv hdfs://master:54310/user/hduser/file2.csv* ~Sarath On Wed, Jul 16, 2014 at 8:14 PM, Michael Armbrust mich...@databricks.com wrote: What if you just run something like: *sc.textFile(hdfs://localhost:54310/user/hduser/file1.csv).count()* On Wed, Jul 16, 2014 at 10:37 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Yes Soumya, I did it. First I tried with the example available in the documentation (example using people table and finding teenagers). After successfully running it, I moved on to this one which is starting point to a bigger requirement for which I'm evaluating Spark SQL. On Wed, Jul 16, 2014 at 7:59 PM, Soumya Simanta soumya.sima...@gmail.com wrote: Can you try submitting a very simple job to the cluster. On Jul 16, 2014, at 10:25 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Yes it is appearing on the Spark UI, and remains there with state as RUNNING till I press Ctrl+C in the terminal to kill the execution. Barring the statements to create the spark context, if I copy paste the lines of my code in spark shell, runs perfectly giving the desired output. ~Sarath On Wed, Jul 16, 2014 at 7:48 PM, Soumya Simanta soumya.sima...@gmail.com wrote: When you submit your job, it should appear on the Spark UI. Same with the REPL. Make sure you job is submitted to the cluster properly. On Wed, Jul 16, 2014 at 10:08 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi Soumya, Data is very small, 500+ lines in each file. Removed last 2 lines and placed this at the end matched.collect().foreach(println);. Still no luck. It's been more than 5min, the execution is still running. Checked logs, nothing in stdout. In stderr I don't see anything going wrong, all are info messages. What else do I need check? ~Sarath On Wed, Jul 16, 2014 at 7:23 PM, Soumya Simanta soumya.sima...@gmail.com wrote: Check your executor logs for the output or if your data is not big collect it in the driver and print it. On Jul 16, 2014, at 9:21 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi All, I'm trying to do a simple record matching between 2 files and wrote following code - *import org.apache.spark.sql.SQLContext;* *import org.apache.spark.rdd.RDD* *object SqlTest {* * case class Test(fld1:String, fld2:String, fld3:String, fld4:String, fld4:String, fld5:Double, fld6:String);* * sc.addJar(test1-0.1.jar);* * val file1 = sc.textFile(hdfs://localhost:54310/user/hduser/file1.csv);* * val file2 = sc.textFile(hdfs://localhost:54310/user/hduser/file2.csv);* * val sq = new SQLContext(sc);* * val file1_recs: RDD[Test] = file1.map(_.split(,)).map(l = Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));* * val file2_recs: RDD[Test] = file2.map(_.split(,)).map(s = Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));* * val file1_schema = sq.createSchemaRDD(file1_recs);* * val file2_schema = sq.createSchemaRDD(file2_recs);* * file1_schema.registerAsTable(file1_tab);* * file2_schema.registerAsTable(file2_tab);* * val matched = sq.sql(select * from file1_tab l join file2_tab s on l.fld6=s.fld6 where l.fld3=s.fld3 and l.fld4=s.fld4 and