Re: [SQL] Elasticsearch-hadoop, exception creating temporary table
Thanks for the assistance, I found the error it wan something I had donep; PEBCAK. I had placed a version of the elasticsearch-hadoop.2.1.0.BETA3 in the project/lib directory causing it to be managed dependency and being brought in first, even though the build.sbt had the correct version specified, 2.1.0.BUILD-SNAPSHOT No reason for it to bet there at all and something I don't usually do. Thanks aging for point out the fact that it was a version mismatch issue. -Todd On Wed, Mar 18, 2015 at 9:59 PM, Cheng, Hao wrote: > Todd, can you try run the code in Spark shell (bin/spark-shell), maybe > you need to write some fake code to call the function in MappingUtils > .scala, in the meantime, can you also check the jar dependencies tree of > your project? Or the download dependency jar files, just in case multiple > versions of spark has been introduced. > > > > *From:* Todd Nist [mailto:tsind...@gmail.com] > *Sent:* Thursday, March 19, 2015 9:04 AM > *To:* Cheng, Hao > *Cc:* user@spark.apache.org > *Subject:* Re: [SQL] Elasticsearch-hadoop, exception creating temporary > table > > > > Thanks for the quick response. > > The spark server is spark-1.2.1-bin-hadoop2.4 from the Spark download. > Here is the startup: > > radtech>$ ./sbin/start-master.sh > > starting org.apache.spark.deploy.master.Master, logging *to* > /usr/local/spark-1.2.1-bin-hadoop2.4/sbin/../logs/spark-tnist-org.apache.spark.deploy.master.Master-1-radtech.io.*out* > > > > Spark *assembly* *has* been built *with* Hive, including Datanucleus jars > *on* classpath > > Spark Command: java -cp > ::/usr/local/spark-1.2.1-bin-hadoop2.4/sbin/../conf:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/spark-*assembly*-1.2.1-hadoop2.4.0.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar > -Dspark.akka.logLifecycleEvents=*true* -Xms512m -Xmx512m > org.apache.spark.deploy.master.Master --ip radtech.io --port 7077 > --webui-port 8080 > > > > > > 15/03/18 20:31:40 INFO Master: Registered signal handlers *for* [TERM, HUP, > INT] > > 15/03/18 20:31:40 INFO SecurityManager: Changing view acls *to*: tnist > > 15/03/18 20:31:40 INFO SecurityManager: Changing modify acls *to*: tnist > > 15/03/18 20:31:40 INFO SecurityManager: SecurityManager: authentication > disabled; ui acls disabled; users *with* view permissions: *Set*(tnist); > users *with* modify permissions: *Set*(tnist) > > 15/03/18 20:31:41 INFO Slf4jLogger: Slf4jLogger started > > 15/03/18 20:31:41 INFO Remoting: Starting remoting > > 15/03/18 20:31:41 INFO Remoting: Remoting started; listening *on* addresses > :[akka.tcp://sparkmas...@radtech.io:7077] > > 15/03/18 20:31:41 INFO Remoting: Remoting now listens *on* addresses: > [akka.tcp://sparkmas...@radtech.io:7077] > > 15/03/18 20:31:41 INFO Utils: Successfully started service 'sparkMaster' *on* > port 7077. > > 15/03/18 20:31:41 INFO Master: Starting Spark master at > spark://radtech.io:7077 > > 15/03/18 20:31:41 INFO Utils: Successfully started service 'MasterUI' *on* > port 8080. > > 15/03/18 20:31:41 INFO MasterWebUI: Started MasterWebUI at > http://192.168.1.5:8080 > > 15/03/18 20:31:41 INFO Master: I have been elected leader! *New* state: ALIVE > > My build.sbt for the spark job is as follows: > > import AssemblyKeys._ > > > > // activating assembly plugin > > assemblySettings > > > > name := "elasticsearch-spark" > > > > *version* := "0.0.1" > > > > val SCALA_VERSION = "2.10.4" > > > > val SPARK_VERSION = "1.2.1" > > > > val defaultSettings = Defaults.coreDefaultSettings ++ Seq( > > organization := "io.radtec", > > scalaVersion := SCALA_VERSION, > > resolvers := Seq( > > //"ods-repo" at "http://artifactory.ods:8082/artifactory/repo";, > > Resolver.typesafeRepo("releases")), > > scalacOptions ++= Seq( > > "-unchecked", > > "-deprecation", > > "-Xlint", > > "-Ywarn-dead-code", > > "-language:_", > > "-target:jvm-1.7", > > "-encoding", > > "UTF-8" > > ), > > parallelExecution in Test := false, > > testOptions += Tests.Argument(TestFrameworks.JUnit, "-v"), > > publishArtifact in (Test, packageBin) := true, > > unmanagedSourceDirectories in Compile <<=
Re: [SQL] Elasticsearch-hadoop, exception creating temporary table
ckson-module-jaxb-annotations" % "2.5.1", "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.5.1", "com.fasterxml.jackson.dataformat" % "jackson-dataformat-xml" % "2.5.1", "com.fasterxml.jackson.datatype"% "jackson-datatype-joda" % "2.5.1" ) val commonTestDeps = Seq( "org.specs2" %% "specs2" % "2.3.11" % "test", "org.mockito" % "mockito-all" % "1.9.5" % "test", "org.scalacheck" %% "scalacheck" % "1.11.3" % "test", "org.scalatest"%% "scalatest"% "1.9.1" % "test") // Project definitions lazy val root = (project in file(".")) .settings(defaultSettings:_*) .settings(libraryDependencies ++= Seq( "com.databricks" %% "spark-csv" % "0.1", // Spark itself, configured as provided, since it shouldn't go to assembly jar "org.apache.spark" %% "spark-core" % SPARK_VERSION % "provided", "org.apache.spark" %% "spark-streaming" % SPARK_VERSION % "provided", "org.apache.spark" %% "spark-sql" % SPARK_VERSION % "provided", "org.apache.spark" %% "spark-hive" % SPARK_VERSION % "provided", ("org.apache.spark"%% "spark-streaming-kafka" % SPARK_VERSION). exclude("org.apache.spark", "spark-core_2.10"). exclude("org.apache.spark", "spark-streaming_2.10"). exclude("org.apache.spark", "spark-sql_2.10"). exclude("javax.jms", "jms"), "org.apache.spark" %% "spark-streaming" % SPARK_VERSION % "test" classifier "tests", "com.typesafe" % "config"% "1.2.1", "com.typesafe.play"%% "play-json" % "2.3.4" ) ++ hadoopDeps ++ esHadoopDeps ++ jsonDeps ++ commonTestDeps ++ commonDeps) resolvers ++= Seq( Resolver.sonatypeRepo("snapshots"), Resolver.sonatypeRepo("public"), "conjars.org" at "http://conjars.org/repo";, "JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/";, "Spray Repository" at "http://repo.spray.cc/";, "Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/";, "Akka Repository" at "http://repo.akka.io/releases/";, "Twitter4J Repository" at "http://twitter4j.org/maven2/";, "Apache HBase" at "https://repository.apache.org/content/repositories/releases";, "Twitter Maven Repo" at "http://maven.twttr.com/";, "scala-tools" at "https://oss.sonatype.org/content/groups/scala-tools";, "Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/";, "Second Typesafe repo" at "http://repo.typesafe.com/typesafe/maven-releases/"; ) mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) => { case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard case m if m.startsWith("META-INF") => MergeStrategy.discard case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first case PathList("org", "apache", xs @ _*) => MergeStrategy.first case PathList("org", "jboss", xs @ _*) => MergeStrategy.first case "about.html" => MergeStrategy.rename case "reference.conf" => MergeStrategy.concat case _ => MergeStrategy.first } } Am I by chance missing an exclude that is bring in an older version of spark into the Assembly; hmm need to go look at that. I am using the SNAPSHOT build of elasticsearch-hadoop as it is built against 1.2.1 of spark. Per the elasticsearch-hadoop gradle.properties the spark version set to: sparkVersion = 1.2.1 Other than possibly missing an exclude that is bring in an older version of Spark from some where, I do see that I am referencing the "org.apache.hadoop" % "hadoop-client" % "2.6.0"
RE: [SQL] Elasticsearch-hadoop, exception creating temporary table
Seems the elasticsearch-hadoop project was built with an old version of Spark, and then you upgraded the Spark version in execution env, as I know the StructField changed the definition in Spark 1.2, can you confirm the version problem first? From: Todd Nist [mailto:tsind...@gmail.com] Sent: Thursday, March 19, 2015 7:49 AM To: user@spark.apache.org Subject: [SQL] Elasticsearch-hadoop, exception creating temporary table I am attempting to access ElasticSearch and expose it’s data through SparkSQL using the elasticsearch-hadoop project. I am encountering the following exception when trying to create a Temporary table from a resource in ElasticSearch.: 15/03/18 07:54:46 INFO DAGScheduler: Job 2 finished: runJob at EsSparkSQL.scala:51, took 0.862184 s Create Temporary Table for querying Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.types.StructField.(Ljava/lang/String;Lorg/apache/spark/sql/catalyst/types/DataType;Z)V at org.elasticsearch.spark.sql.MappingUtils$.org$elasticsearch$spark$sql$MappingUtils$$convertField(MappingUtils.scala:75) at org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54) at org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.elasticsearch.spark.sql.MappingUtils$.convertToStruct(MappingUtils.scala:54) at org.elasticsearch.spark.sql.MappingUtils$.discoverMapping(MappingUtils.scala:47) at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema$lzycompute(DefaultSource.scala:33) at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema(DefaultSource.scala:32) at org.elasticsearch.spark.sql.ElasticsearchRelation.(DefaultSource.scala:36) at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:20) at org.apache.spark.sql.sources.CreateTableUsing.run(ddl.scala:103) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:67) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:67) at org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:75) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425) at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) at org.apache.spark.sql.SchemaRDD.(SchemaRDD.scala:108) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:303) at io.radtech.elasticsearch.spark.ElasticSearchReadWrite$.main(ElasticSearchReadWrite.scala:87) at io.radtech.elasticsearch.spark.ElasticSearchReadWrite.main(ElasticSearchReadWrite.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) I have loaded the “accounts.json” file from ElasticSearch into my ElasticSearch cluster. The mapping looks as follows: radtech:elastic-search-work tnist$ curl -XGET 'http://localhost:9200/bank/_mapping' {"bank":{"mappings":{"account":{"properties":{"account_number":{"type":"long"},"address":{"type":"string"},"age":{"type":"long"},"balance":{"type":"long"},"city":{"type":"string"},"email":{"type":"string"},"employer":{"type":"string"},"firstname":{"type":"string"},"gender":{"type":"string"},"lastname":{"type":"string"},"state":{"type":"string"}} I can read the data just fine doing the following: import java.io.File import scala.collection.JavaConversions._ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{SchemaRDD,SQLContext} // ES imports import org.elasticsearch.spark._ import org.elasticsearch.spark.sql._ import io.radtech.spark.utils.{Settings, Spark, ElasticSearch} object ElasticSearchReadWrite { /** * Spark specific configuration */ def sparkInit(): SparkContext = { val conf = new SparkConf().setAppName(Spark.AppName).setMaster(Spark.Master) conf.set("es.nodes", ElasticSearch.Nodes) conf.set("es.port", ElasticSearch.HttpPort.toStri
[SQL] Elasticsearch-hadoop, exception creating temporary table
I am attempting to access ElasticSearch and expose it’s data through SparkSQL using the elasticsearch-hadoop project. I am encountering the following exception when trying to create a Temporary table from a resource in ElasticSearch.: 15/03/18 07:54:46 INFO DAGScheduler: Job 2 finished: runJob at EsSparkSQL.scala:51, took 0.862184 s Create Temporary Table for querying Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.types.StructField.(Ljava/lang/String;Lorg/apache/spark/sql/catalyst/types/DataType;Z)V at org.elasticsearch.spark.sql.MappingUtils$.org$elasticsearch$spark$sql$MappingUtils$$convertField(MappingUtils.scala:75) at org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54) at org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.elasticsearch.spark.sql.MappingUtils$.convertToStruct(MappingUtils.scala:54) at org.elasticsearch.spark.sql.MappingUtils$.discoverMapping(MappingUtils.scala:47) at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema$lzycompute(DefaultSource.scala:33) at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema(DefaultSource.scala:32) at org.elasticsearch.spark.sql.ElasticsearchRelation.(DefaultSource.scala:36) at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:20) at org.apache.spark.sql.sources.CreateTableUsing.run(ddl.scala:103) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:67) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:67) at org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:75) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425) at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) at org.apache.spark.sql.SchemaRDD.(SchemaRDD.scala:108) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:303) at io.radtech.elasticsearch.spark.ElasticSearchReadWrite$.main(ElasticSearchReadWrite.scala:87) at io.radtech.elasticsearch.spark.ElasticSearchReadWrite.main(ElasticSearchReadWrite.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) I have loaded the “accounts.json” file from ElasticSearch into my ElasticSearch cluster. The mapping looks as follows: radtech:elastic-search-work tnist$ curl -XGET 'http://localhost:9200/bank/_mapping' {"bank":{"mappings":{"account":{"properties":{"account_number":{"type":"long"},"address":{"type":"string"},"age":{"type":"long"},"balance":{"type":"long"},"city":{"type":"string"},"email":{"type":"string"},"employer":{"type":"string"},"firstname":{"type":"string"},"gender":{"type":"string"},"lastname":{"type":"string"},"state":{"type":"string"}} I can read the data just fine doing the following: import java.io.File import scala.collection.JavaConversions._ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{SchemaRDD,SQLContext} // ES imports import org.elasticsearch.spark._ import org.elasticsearch.spark.sql._ import io.radtech.spark.utils.{Settings, Spark, ElasticSearch} object ElasticSearchReadWrite { /** * Spark specific configuration */ def sparkInit(): SparkContext = { val conf = new SparkConf().setAppName(Spark.AppName).setMaster(Spark.Master) conf.set("es.nodes", ElasticSearch.Nodes) conf.set("es.port", ElasticSearch.HttpPort.toString()) conf.set("es.index.auto.create", "true"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.executor.memory","1g") conf.set("spark.kryoserializer.buffer.mb","256") val sparkContext = new SparkContext(conf) sparkContext } def main(args: Array[String]) { val sc = sparkInit val sqlContext = new SQLContext(sc) import sqlContext._ val start = System.currentTimeMillis() /* * Read from ES and query with with Spark & SparkSQL */ val esData = sc.esRDD(s"${ElasticSearch.Index}/${ElasticSearch.Type}") esData.collect.foreach(println(_)) val end = System.currentTimeMillis() println(s"Total time: ${end-start} ms") This works fine and and prints the content of esData out as one would expect. 15/03/18 07:54:42 INFO DAGScheduler: Job 0 finished: collect at ElasticSearchReadWrit