Can you print out the queryExecution? (i.e. println(sql(....).queryExecution))
On Tue, Jul 15, 2014 at 12:44 PM, Keith Simmons <keith.simm...@gmail.com> wrote: > To give a few more details of my environment in case that helps you > reproduce: > > * I'm running spark 1.0.1 downloaded as a tar ball, not built myself > * I'm running in stand alone mode, with 1 master and 1 worker, both on the > same machine (though the same error occurs with two workers on two machines) > * I'm using spark-core and spark-sql 1.0.1 pulled via maven > > Here's my built.sbt: > > name := "spark-test" > > version := "1.0" > > scalaVersion := "2.10.4" > > resolvers += "Akka Repository" at "http://repo.akka.io/releases/" > > resolvers += "Cloudera Repository" at " > https://repository.cloudera.com/artifactory/cloudera-repos/" > > libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.0.1" % > "provided" > > libraryDependencies += "org.apache.spark" %% "spark-core" % "1.0.1" % > "provided" > > > On Tue, Jul 15, 2014 at 12:21 PM, Zongheng Yang <zonghen...@gmail.com> > wrote: > >> FWIW, I am unable to reproduce this using the example program locally. >> >> On Tue, Jul 15, 2014 at 11:56 AM, Keith Simmons <keith.simm...@gmail.com> >> wrote: >> > Nope. All of them are registered from the driver program. >> > >> > However, I think we've found the culprit. If the join column between >> two >> > tables is not in the same column position in both tables, it triggers >> what >> > appears to be a bug. For example, this program fails: >> > >> > import org.apache.spark.SparkContext._ >> > import org.apache.spark.SparkContext >> > import org.apache.spark.SparkConf >> > import org.apache.spark.sql.SQLContext >> > import org.apache.spark.sql.SchemaRDD >> > import org.apache.spark.sql.catalyst.types._ >> > >> > case class Record(value: String, key: Int) >> > case class Record2(key: Int, value: String) >> > >> > object TestJob { >> > >> > def main(args: Array[String]) { >> > run() >> > } >> > >> > private def run() { >> > val sparkConf = new SparkConf() >> > sparkConf.setAppName("TestJob") >> > sparkConf.set("spark.cores.max", "8") >> > sparkConf.set("spark.storage.memoryFraction", "0.1") >> > sparkConf.set("spark.shuffle.memoryFracton", "0.2") >> > sparkConf.set("spark.executor.memory", "2g") >> > >> sparkConf.setJars(List("target/scala-2.10/spark-test-assembly-1.0.jar")) >> > sparkConf.setMaster(s"spark://dev1.dev.pulse.io:7077") >> > sparkConf.setSparkHome("/home/pulseio/spark/current") >> > val sc = new SparkContext(sparkConf) >> > >> > val sqlContext = new org.apache.spark.sql.SQLContext(sc) >> > import sqlContext._ >> > >> > val rdd1 = sc.parallelize((1 to 100).map(i => Record(s"val_$i", i))) >> > val rdd2 = sc.parallelize((1 to 100).map(i => Record2(i, >> s"rdd_$i"))) >> > rdd1.registerAsTable("rdd1") >> > rdd2.registerAsTable("rdd2") >> > >> > sql("SELECT * FROM rdd1").collect.foreach { row => println(row) } >> > >> > sql("SELECT rdd1.key, rdd1.value, rdd2.value FROM rdd1 join rdd2 on >> > rdd1.key = rdd2.key order by rdd1.key").collect.foreach { row => >> > println(row) } >> > >> > sc.stop() >> > } >> > >> > } >> > >> > If you change the definition of Record and Record2 to the following, it >> > succeeds: >> > >> > case class Record(key: Int, value: String) >> > case class Record2(key: Int, value: String) >> > >> > as does: >> > >> > case class Record(value: String, key: Int) >> > case class Record2(value: String, key: Int) >> > >> > Let me know if you need anymore details. >> > >> > >> > On Tue, Jul 15, 2014 at 11:14 AM, Michael Armbrust < >> mich...@databricks.com> >> > wrote: >> >> >> >> Are you registering multiple RDDs of case classes as tables >> concurrently? >> >> You are possibly hitting SPARK-2178 which is caused by SI-6240. >> >> >> >> >> >> On Tue, Jul 15, 2014 at 10:49 AM, Keith Simmons < >> keith.simm...@gmail.com> >> >> wrote: >> >>> >> >>> HI folks, >> >>> >> >>> I'm running into the following error when trying to perform a join in >> my >> >>> code: >> >>> >> >>> java.lang.NoClassDefFoundError: Could not initialize class >> >>> org.apache.spark.sql.catalyst.types.LongType$ >> >>> >> >>> I see similar errors for StringType$ and also: >> >>> >> >>> scala.reflect.runtime.ReflectError: value apache is not a package. >> >>> >> >>> Strangely, if I just work with a single table, everything is fine. I >> can >> >>> iterate through the records in both tables and print them out without >> a >> >>> problem. >> >>> >> >>> Furthermore, this code worked without an exception in Spark 1.0.0 >> >>> (thought the join caused some field corruption, possibly related to >> >>> https://issues.apache.org/jira/browse/SPARK-1994). The data is >> coming from >> >>> a custom protocol buffer based format on hdfs that is being mapped >> into the >> >>> individual record types without a problem. >> >>> >> >>> The immediate cause seems to be a task trying to deserialize one or >> more >> >>> SQL case classes before loading the spark uber jar, but I have no >> idea why >> >>> this is happening, or why it only happens when I do a join. Ideas? >> >>> >> >>> Keith >> >>> >> >>> P.S. If it's relevant, we're using the Kryo serializer. >> >>> >> >>> >> >> >> > >> > >