Re: [SQL] Elasticsearch-hadoop, exception creating temporary table

2015-03-19 Thread Todd Nist
Thanks for the assistance, I found the error it wan something I had donep;
PEBCAK.  I had placed a version of the elasticsearch-hadoop.2.1.0.BETA3 in
the project/lib directory causing it to be managed dependency and being
brought in first, even though the build.sbt had the correct version
specified, 2.1.0.BUILD-SNAPSHOT

No reason for it to bet there at all and something I don't usually do.

Thanks aging for point out the fact that it was a version mismatch issue.

-Todd

On Wed, Mar 18, 2015 at 9:59 PM, Cheng, Hao  wrote:

>  Todd, can you try run the code in Spark shell (bin/spark-shell), maybe
> you need to write some fake code to call the function in MappingUtils
> .scala, in the meantime, can you also check the jar dependencies tree of
> your project? Or the download dependency jar files, just in case multiple
> versions of spark has been introduced.
>
>
>
> *From:* Todd Nist [mailto:tsind...@gmail.com]
> *Sent:* Thursday, March 19, 2015 9:04 AM
> *To:* Cheng, Hao
> *Cc:* user@spark.apache.org
> *Subject:* Re: [SQL] Elasticsearch-hadoop, exception creating temporary
> table
>
>
>
> Thanks for the quick response.
>
> The spark server is spark-1.2.1-bin-hadoop2.4 from the Spark download.
> Here is the startup:
>
> radtech>$ ./sbin/start-master.sh
>
> starting org.apache.spark.deploy.master.Master, logging *to* 
> /usr/local/spark-1.2.1-bin-hadoop2.4/sbin/../logs/spark-tnist-org.apache.spark.deploy.master.Master-1-radtech.io.*out*
>
>
>
> Spark *assembly* *has* been built *with* Hive, including Datanucleus jars 
> *on* classpath
>
> Spark Command: java -cp 
> ::/usr/local/spark-1.2.1-bin-hadoop2.4/sbin/../conf:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/spark-*assembly*-1.2.1-hadoop2.4.0.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar
>  -Dspark.akka.logLifecycleEvents=*true* -Xms512m -Xmx512m 
> org.apache.spark.deploy.master.Master --ip radtech.io --port 7077 
> --webui-port 8080
>
> 
>
>
>
> 15/03/18 20:31:40 INFO Master: Registered signal handlers *for* [TERM, HUP, 
> INT]
>
> 15/03/18 20:31:40 INFO SecurityManager: Changing view acls *to*: tnist
>
> 15/03/18 20:31:40 INFO SecurityManager: Changing modify acls *to*: tnist
>
> 15/03/18 20:31:40 INFO SecurityManager: SecurityManager: authentication 
> disabled; ui acls disabled; users *with* view permissions: *Set*(tnist); 
> users *with* modify permissions: *Set*(tnist)
>
> 15/03/18 20:31:41 INFO Slf4jLogger: Slf4jLogger started
>
> 15/03/18 20:31:41 INFO Remoting: Starting remoting
>
> 15/03/18 20:31:41 INFO Remoting: Remoting started; listening *on* addresses 
> :[akka.tcp://sparkmas...@radtech.io:7077]
>
> 15/03/18 20:31:41 INFO Remoting: Remoting now listens *on* addresses: 
> [akka.tcp://sparkmas...@radtech.io:7077]
>
> 15/03/18 20:31:41 INFO Utils: Successfully started service 'sparkMaster' *on* 
> port 7077.
>
> 15/03/18 20:31:41 INFO Master: Starting Spark master at 
> spark://radtech.io:7077
>
> 15/03/18 20:31:41 INFO Utils: Successfully started service 'MasterUI' *on* 
> port 8080.
>
> 15/03/18 20:31:41 INFO MasterWebUI: Started MasterWebUI at 
> http://192.168.1.5:8080
>
> 15/03/18 20:31:41 INFO Master: I have been elected leader! *New* state: ALIVE
>
>  My build.sbt for the spark job is as follows:
>
> import AssemblyKeys._
>
>
>
> // activating assembly plugin
>
> assemblySettings
>
>
>
> name := "elasticsearch-spark"
>
>
>
> *version* := "0.0.1"
>
>
>
> val SCALA_VERSION = "2.10.4"
>
>
>
> val SPARK_VERSION = "1.2.1"
>
>
>
> val defaultSettings = Defaults.coreDefaultSettings ++ Seq(
>
>   organization := "io.radtec",
>
>   scalaVersion := SCALA_VERSION,
>
>   resolvers := Seq(
>
> //"ods-repo" at "http://artifactory.ods:8082/artifactory/repo";,
>
> Resolver.typesafeRepo("releases")),
>
>   scalacOptions ++= Seq(
>
> "-unchecked",
>
> "-deprecation",
>
> "-Xlint",
>
> "-Ywarn-dead-code",
>
> "-language:_",
>
> "-target:jvm-1.7",
>
> "-encoding",
>
> "UTF-8"
>
>   ),
>
>   parallelExecution in Test := false,
>
>   testOptions += Tests.Argument(TestFrameworks.JUnit, "-v"),
>
>   publishArtifact in (Test, packageBin) := true,
>
>   unmanagedSourceDirectories in Compile <<= 

Re: [SQL] Elasticsearch-hadoop, exception creating temporary table

2015-03-18 Thread Todd Nist
ckson-module-jaxb-annotations" % "2.5.1",
  "com.fasterxml.jackson.module" %% "jackson-module-scala"
   % "2.5.1",
  "com.fasterxml.jackson.dataformat"  % "jackson-dataformat-xml"
   % "2.5.1",
  "com.fasterxml.jackson.datatype"% "jackson-datatype-joda"
   % "2.5.1"
)

val commonTestDeps = Seq(
  "org.specs2"   %% "specs2"   % "2.3.11"
 % "test",
  "org.mockito"   % "mockito-all"  % "1.9.5"
 % "test",
  "org.scalacheck"   %% "scalacheck"   % "1.11.3"
 % "test",
  "org.scalatest"%% "scalatest"% "1.9.1"
 % "test")
// Project definitions

lazy val root = (project in file("."))
.settings(defaultSettings:_*)
.settings(libraryDependencies ++= Seq(
"com.databricks"   %% "spark-csv" % "0.1",
// Spark itself, configured as provided, since it
shouldn't go to assembly jar
"org.apache.spark" %% "spark-core"
% SPARK_VERSION   % "provided",
"org.apache.spark" %% "spark-streaming"
% SPARK_VERSION   % "provided",
"org.apache.spark" %% "spark-sql"
% SPARK_VERSION   % "provided",
"org.apache.spark" %% "spark-hive"
% SPARK_VERSION   % "provided",
("org.apache.spark"%% "spark-streaming-kafka"
% SPARK_VERSION).
exclude("org.apache.spark", "spark-core_2.10").
exclude("org.apache.spark", "spark-streaming_2.10").
exclude("org.apache.spark", "spark-sql_2.10").
exclude("javax.jms", "jms"),
"org.apache.spark" %% "spark-streaming"
% SPARK_VERSION   % "test" classifier "tests",
"com.typesafe"  % "config"% "1.2.1",
"com.typesafe.play"%% "play-json" % "2.3.4"
) ++ hadoopDeps ++ esHadoopDeps ++ jsonDeps ++
commonTestDeps ++ commonDeps)

resolvers ++= Seq(
  Resolver.sonatypeRepo("snapshots"),
  Resolver.sonatypeRepo("public"),
  "conjars.org" at "http://conjars.org/repo";,
  "JBoss Repository" at
"http://repository.jboss.org/nexus/content/repositories/releases/";,
  "Spray Repository" at "http://repo.spray.cc/";,
  "Cloudera Repository" at
"https://repository.cloudera.com/artifactory/cloudera-repos/";,
  "Akka Repository" at "http://repo.akka.io/releases/";,
  "Twitter4J Repository" at "http://twitter4j.org/maven2/";,
  "Apache HBase" at
"https://repository.apache.org/content/repositories/releases";,
  "Twitter Maven Repo" at "http://maven.twttr.com/";,
  "scala-tools" at "https://oss.sonatype.org/content/groups/scala-tools";,
  "Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/";,
  "Second Typesafe repo" at "http://repo.typesafe.com/typesafe/maven-releases/";
)

mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
  {
    case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
case m if m.startsWith("META-INF") => MergeStrategy.discard
case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
case PathList("org", "apache", xs @ _*) => MergeStrategy.first
case PathList("org", "jboss", xs @ _*) => MergeStrategy.first
case "about.html"  => MergeStrategy.rename
case "reference.conf" => MergeStrategy.concat
case _ => MergeStrategy.first
  }
}

Am I by chance missing an exclude that is bring in an older version of
spark into the Assembly; hmm need to go look at that.

I am using the SNAPSHOT build of elasticsearch-hadoop as it is built
against 1.2.1 of spark.  Per the elasticsearch-hadoop gradle.properties the
spark version set to:

sparkVersion = 1.2.1

Other than possibly missing an exclude that is bring in an older version of
Spark from some where, I do see that I am referencing the
"org.apache.hadoop" % "hadoop-client" % "2.6.0"

RE: [SQL] Elasticsearch-hadoop, exception creating temporary table

2015-03-18 Thread Cheng, Hao
Seems the elasticsearch-hadoop project was built with an old version of Spark, 
and then you upgraded the Spark version in execution env, as I know the 
StructField changed the definition in Spark 1.2, can you confirm the version 
problem first?

From: Todd Nist [mailto:tsind...@gmail.com]
Sent: Thursday, March 19, 2015 7:49 AM
To: user@spark.apache.org
Subject: [SQL] Elasticsearch-hadoop, exception creating temporary table



I am attempting to access ElasticSearch and expose it’s data through SparkSQL 
using the elasticsearch-hadoop project.  I am encountering the following 
exception when trying to create a Temporary table from a resource in 
ElasticSearch.:

15/03/18 07:54:46 INFO DAGScheduler: Job 2 finished: runJob at 
EsSparkSQL.scala:51, took 0.862184 s

Create Temporary Table for querying

Exception in thread "main" java.lang.NoSuchMethodError: 
org.apache.spark.sql.catalyst.types.StructField.(Ljava/lang/String;Lorg/apache/spark/sql/catalyst/types/DataType;Z)V

at 
org.elasticsearch.spark.sql.MappingUtils$.org$elasticsearch$spark$sql$MappingUtils$$convertField(MappingUtils.scala:75)

at 
org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54)

at 
org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54)

at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)

at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)

at 
org.elasticsearch.spark.sql.MappingUtils$.convertToStruct(MappingUtils.scala:54)

at 
org.elasticsearch.spark.sql.MappingUtils$.discoverMapping(MappingUtils.scala:47)

at 
org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema$lzycompute(DefaultSource.scala:33)

at 
org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema(DefaultSource.scala:32)

at 
org.elasticsearch.spark.sql.ElasticsearchRelation.(DefaultSource.scala:36)

at 
org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:20)

at org.apache.spark.sql.sources.CreateTableUsing.run(ddl.scala:103)

at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:67)

at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:67)

at org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:75)

at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)

at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)

at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58)

at org.apache.spark.sql.SchemaRDD.(SchemaRDD.scala:108)

at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:303)

at 
io.radtech.elasticsearch.spark.ElasticSearchReadWrite$.main(ElasticSearchReadWrite.scala:87)

at 
io.radtech.elasticsearch.spark.ElasticSearchReadWrite.main(ElasticSearchReadWrite.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)



I have loaded the “accounts.json” file from ElasticSearch into my ElasticSearch 
cluster. The mapping looks as follows:

radtech:elastic-search-work tnist$ curl -XGET 
'http://localhost:9200/bank/_mapping'

{"bank":{"mappings":{"account":{"properties":{"account_number":{"type":"long"},"address":{"type":"string"},"age":{"type":"long"},"balance":{"type":"long"},"city":{"type":"string"},"email":{"type":"string"},"employer":{"type":"string"},"firstname":{"type":"string"},"gender":{"type":"string"},"lastname":{"type":"string"},"state":{"type":"string"}}

I can read the data just fine doing the following:

import java.io.File



import scala.collection.JavaConversions._



import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.SparkContext._

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.{SchemaRDD,SQLContext}



// ES imports

import org.elasticsearch.spark._

import org.elasticsearch.spark.sql._



import io.radtech.spark.utils.{Settings, Spark, ElasticSearch}



object ElasticSearchReadWrite {



  /**

   * Spark specific configuration

   */

  def sparkInit(): SparkContext = {

val conf = new SparkConf().setAppName(Spark.AppName).setMaster(Spark.Master)

conf.set("es.nodes", ElasticSearch.Nodes)

conf.set("es.port", ElasticSearch.HttpPort.toStri

[SQL] Elasticsearch-hadoop, exception creating temporary table

2015-03-18 Thread Todd Nist
I am attempting to access ElasticSearch and expose it’s data through
SparkSQL using the elasticsearch-hadoop project.  I am encountering the
following exception when trying to create a Temporary table from a resource
in ElasticSearch.:

15/03/18 07:54:46 INFO DAGScheduler: Job 2 finished: runJob at
EsSparkSQL.scala:51, took 0.862184 s
Create Temporary Table for querying
Exception in thread "main" java.lang.NoSuchMethodError:
org.apache.spark.sql.catalyst.types.StructField.(Ljava/lang/String;Lorg/apache/spark/sql/catalyst/types/DataType;Z)V
at 
org.elasticsearch.spark.sql.MappingUtils$.org$elasticsearch$spark$sql$MappingUtils$$convertField(MappingUtils.scala:75)
at 
org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54)
at 
org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at 
org.elasticsearch.spark.sql.MappingUtils$.convertToStruct(MappingUtils.scala:54)
at 
org.elasticsearch.spark.sql.MappingUtils$.discoverMapping(MappingUtils.scala:47)
at 
org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema$lzycompute(DefaultSource.scala:33)
at 
org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema(DefaultSource.scala:32)
at 
org.elasticsearch.spark.sql.ElasticsearchRelation.(DefaultSource.scala:36)
at 
org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:20)
at org.apache.spark.sql.sources.CreateTableUsing.run(ddl.scala:103)
at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:67)
at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:67)
at org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:75)
at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58)
at org.apache.spark.sql.SchemaRDD.(SchemaRDD.scala:108)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:303)
at 
io.radtech.elasticsearch.spark.ElasticSearchReadWrite$.main(ElasticSearchReadWrite.scala:87)
at 
io.radtech.elasticsearch.spark.ElasticSearchReadWrite.main(ElasticSearchReadWrite.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

I have loaded the “accounts.json” file from ElasticSearch into my
ElasticSearch cluster. The mapping looks as follows:

radtech:elastic-search-work tnist$ curl -XGET
'http://localhost:9200/bank/_mapping'
{"bank":{"mappings":{"account":{"properties":{"account_number":{"type":"long"},"address":{"type":"string"},"age":{"type":"long"},"balance":{"type":"long"},"city":{"type":"string"},"email":{"type":"string"},"employer":{"type":"string"},"firstname":{"type":"string"},"gender":{"type":"string"},"lastname":{"type":"string"},"state":{"type":"string"}}

I can read the data just fine doing the following:

import java.io.File

import scala.collection.JavaConversions._

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SchemaRDD,SQLContext}

// ES imports
import org.elasticsearch.spark._
import org.elasticsearch.spark.sql._

import io.radtech.spark.utils.{Settings, Spark, ElasticSearch}

object ElasticSearchReadWrite {

  /**
   * Spark specific configuration
   */
  def sparkInit(): SparkContext = {
val conf = new SparkConf().setAppName(Spark.AppName).setMaster(Spark.Master)
conf.set("es.nodes", ElasticSearch.Nodes)
conf.set("es.port", ElasticSearch.HttpPort.toString())
conf.set("es.index.auto.create", "true");
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.set("spark.executor.memory","1g")
conf.set("spark.kryoserializer.buffer.mb","256")

val sparkContext = new SparkContext(conf)

sparkContext
  }

  def main(args: Array[String]) {

val sc = sparkInit

val sqlContext = new SQLContext(sc)
import sqlContext._

val start = System.currentTimeMillis()

/*
 * Read from ES and query with with Spark & SparkSQL
 */
val esData = sc.esRDD(s"${ElasticSearch.Index}/${ElasticSearch.Type}")

esData.collect.foreach(println(_))

val end = System.currentTimeMillis()
println(s"Total time: ${end-start} ms")

This works fine and and prints the content of esData out as one would
expect.

15/03/18 07:54:42 INFO DAGScheduler: Job 0 finished: collect at
ElasticSearchReadWrit