Question regarding Projection PushDown
Hi All, Please help with below question, I am trying to build my own data source to connect to CustomAerospike. Now I am almost done with everything, but still not sure how to implement Projection Pushdown while selecting nested columns. Spark does implicit for column projection pushdown, but looks like nested projection pushdown needs custom implementation. I would like to know if there is a way i can do it myself and any code pointer would be helpful. Currently even though i try to select("col1.nested2") projection pushdown is considering using col1, but does not help in picking col1.nested2. My plan is to create custom projection push down by implementing a method in compute that does pull specific column.nestedcol and converts it to Row. My problem in doing so is I am unable to access the nestedcolumn i am passing in select using my data source. In my relation class i am only getting col1 and i need a way to be able to access the nested2 col that is provided in select query. Regards.
Spark error while trying to spark.read.json()
Hi All, Can anyone help me with below error, Exception in thread "main" java.lang.AbstractMethodError at scala.collection.TraversableLike$class.filterNot(TraversableLike.scala:278) at org.apache.spark.sql.types.StructType.filterNot(StructType.scala:98) at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:386) at org.spark.jsonDF.StructStreamKafkaToDF$.getValueSchema(StructStreamKafkaToDF.scala:22) at org.spark.jsonDF.StructStreaming$.createRowDF(StructStreaming.scala:21) at SparkEntry$.delayedEndpoint$SparkEntry$1(SparkEntry.scala:22) at SparkEntry$delayedInit$body.apply(SparkEntry.scala:7) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:76) at scala.App$$anonfun$main$1.apply(App.scala:76) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) at scala.App$class.main(App.scala:76) at SparkEntry$.main(SparkEntry.scala:7) at SparkEntry.main(SparkEntry.scala) This happening, when i try to pass Dataset[String] containing jsons to spark.read.json(Records). Regards, Satyajit.
Access Array StructField inside StructType.
Hi All, How to iterate over the StructField inside *after*, StructType(StructField(*after*,StructType(*StructField(Alarmed,LongType,true), StructField(CallDollarLimit,StringType,true), StructField(CallRecordWav,StringType,true), StructField(CallTimeLimit,LongType,true), StructField(Signature,StringType,true*), true) Regards, Satyajit.
Joining streaming data with static table data.
Hi All, I working on real time reporting project and i have a question about structured streaming job, that is going to stream a particular table records and would have to join to an existing table. Stream > query/join to another DF/DS ---> update the Stream data record. Now i have a problem on how do i approach the mid layer(query/join to another DF/DS), should i create a DF from spark.read.format("JDBC") or "stream and maintain the data in memory sink" or if there is any better way to do it. Would like to know, if anyone has faced a similar scenario and have any suggestion on how to go ahead. Regards, Satyajit.
Re: Infer JSON schema in structured streaming Kafka.
Hi Burak, Thank you , for the inputs, would definitely try the options. The reason we don't have an unified schema is because we are trying to consume data from different topics that contains data from different tables from a DB, and so each table has different columns. Regards, Satyajit. On Dec 11, 2017 9:29 AM, "Burak Yavuz" <brk...@gmail.com> wrote: > In Spark 2.2, you can read from Kafka in batch mode, and then use the json > reader to infer schema: > > val df = spark.read.format("kafka")... > .select($"value.cast("string")) > val json = spark.read.json(df) > val schema = json.schema > > While the above should be slow (since you're reading almost all data in > Kafka in batch), but it would work. > > My question to you is, do you think it's worth it? Why do you have a > random json schema being inputted to your Kafka stream? Can this randomness > not mess up everything in the future if someone messes up? Not having > fixed, known schemas with streaming data (or any data for that matter) is > dangerous for most purposes. > Just food for thought. > > Best, > Burak > > > > On Mon, Dec 11, 2017 at 4:01 AM, Jacek Laskowski <ja...@japila.pl> wrote: > >> Hi, >> >> What about a custom streaming Sink that would stop the query after >> addBatch has been called? >> >> Pozdrawiam, >> Jacek Laskowski >> >> https://about.me/JacekLaskowski >> Spark Structured Streaming https://bit.ly/spark-structured-streaming >> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark >> Follow me at https://twitter.com/jaceklaskowski >> >> On Mon, Dec 11, 2017 at 9:15 AM, satyajit vegesna < >> satyajit.apas...@gmail.com> wrote: >> >>> Hi Jacek, >>> >>> For now , i am using Thread.sleep() on driver, to make sure my streaming >>> query receives some data and and stop it, before the control reaches >>> querying memory table. >>> Let me know if there is any better way of handling it. >>> >>> Regards, >>> Satyajit. >>> >>> On Sun, Dec 10, 2017 at 10:43 PM, satyajit vegesna < >>> satyajit.apas...@gmail.com> wrote: >>> >>>> Hi Jacek, >>>> >>>> Thank you for responding back, >>>> >>>> i have tried memory sink, and below is what i did >>>> >>>> val fetchValue = >>>> debeziumRecords.selectExpr("value").withColumn("tableName", >>>> functions.get_json_object($"value".cast(StringType), "$.schema.name")) >>>> .withColumn("operation", >>>> functions.get_json_object($"value".cast(StringType), >>>> "$.payload.op")) >>>> .withColumn("payloadAfterValue", >>>> split(substring_index(debeziumRecords("value"), >>>> "\"after\":" ,-1),",\"source\"").getItem(0)) >>>> >>>> .drop("tableName").drop("operation").drop("value").as[String].writeStream >>>> >>>> .outputMode(OutputMode.Append()) >>>> .queryName("record") >>>> .format("memory") >>>> .start() >>>> >>>> spark.sql("select * from record").show(truncate = false) //i was >>>> expecting to be able to use the record table to read the JSON string, but >>>> the table is empty for the first call. And i do not see any dataframe >>>> output after the first one >>>> >>>> *But yeah the above steps work good and i can do things that i need to, >>>> in spark-shell, the problem is when i try to code in Intellij, because the >>>> streaming query keeps running and i am not sure how to identify and stop >>>> the streaming query and use record memory table.* >>>> >>>> So i would like to stop the streaming query once i know i have some >>>> data in my record memory table(is there a way to do that), so i can stop >>>> the streaming query and use the memory table, fetch my record. >>>> Any help on how to approach the situation programmatically/any examples >>>> pointed would highly be appreciated. >>>> >>>> Regards, >>>> Satyajit. >>>> >>>> >>>> >>>> On Sun, Dec 10, 2017 at 9:52 PM, Jacek Laskowski <ja...@japila.pl> >>>> wrote: >>>> >>>>> Hi, >>>>> >
Re: Infer JSON schema in structured streaming Kafka.
Hi Jacek, For now , i am using Thread.sleep() on driver, to make sure my streaming query receives some data and and stop it, before the control reaches querying memory table. Let me know if there is any better way of handling it. Regards, Satyajit. On Sun, Dec 10, 2017 at 10:43 PM, satyajit vegesna < satyajit.apas...@gmail.com> wrote: > Hi Jacek, > > Thank you for responding back, > > i have tried memory sink, and below is what i did > > val fetchValue = debeziumRecords.selectExpr(" > value").withColumn("tableName", > functions.get_json_object($"value".cast(StringType), > "$.schema.name")) > .withColumn("operation", > functions.get_json_object($"value".cast(StringType), > "$.payload.op")) > .withColumn("payloadAfterValue", > split(substring_index(debeziumRecords("value"), > "\"after\":" ,-1),",\"source\"").getItem(0)) > .drop("tableName").drop("operation").drop("value").as[String].writeStream > > .outputMode(OutputMode.Append()) > .queryName("record") > .format("memory") > .start() > > spark.sql("select * from record").show(truncate = false) //i was expecting > to be able to use the record table to read the JSON string, but the table > is empty for the first call. And i do not see any dataframe output after > the first one > > *But yeah the above steps work good and i can do things that i need to, in > spark-shell, the problem is when i try to code in Intellij, because the > streaming query keeps running and i am not sure how to identify and stop > the streaming query and use record memory table.* > > So i would like to stop the streaming query once i know i have some data > in my record memory table(is there a way to do that), so i can stop the > streaming query and use the memory table, fetch my record. > Any help on how to approach the situation programmatically/any examples > pointed would highly be appreciated. > > Regards, > Satyajit. > > > > On Sun, Dec 10, 2017 at 9:52 PM, Jacek Laskowski <ja...@japila.pl> wrote: > >> Hi, >> >> What about memory sink? That could work. >> >> Pozdrawiam, >> Jacek Laskowski >> >> https://about.me/JacekLaskowski >> Spark Structured Streaming https://bit.ly/spark-structured-streaming >> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark >> Follow me at https://twitter.com/jaceklaskowski >> >> On Mon, Dec 11, 2017 at 3:28 AM, satyajit vegesna < >> satyajit.apas...@gmail.com> wrote: >> >>> Hi All, >>> >>> I would like to infer JSON schema from a sample of data that i receive >>> from, Kafka Streams(specific topic), and i have to infer the schema as i am >>> going to receive random JSON string with different schema for each topic, >>> so i chose to go ahead with below steps, >>> >>> a. readStream from Kafka(latest offset), from a single Kafka topic. >>> b. Some how to store the JSON string into val and infer the schema. >>> c. stop the stream. >>> d.Create new readStream(smallest offset) and use the above inferred >>> schema to process the JSON using spark provided JSON support, like >>> from_json, json_object and others and run my actuall business logic. >>> >>> Now i am not sure how to be successful with step(b). Any help would be >>> appreciated. >>> And would also like to know if there is any better approach. >>> >>> Regards, >>> Satyajit. >>> >> >> >
Re: Infer JSON schema in structured streaming Kafka.
Hi Jacek, Thank you for responding back, i have tried memory sink, and below is what i did val fetchValue = debeziumRecords.selectExpr("value").withColumn("tableName", functions.get_json_object($"value".cast(StringType), "$.schema.name")) .withColumn("operation", functions.get_json_object($"value".cast(StringType), "$.payload.op")) .withColumn("payloadAfterValue", split(substring_index(debeziumRecords("value"), "\"after\":" ,-1),",\"source\"").getItem(0)) .drop("tableName").drop("operation").drop("value").as[String].writeStream .outputMode(OutputMode.Append()) .queryName("record") .format("memory") .start() spark.sql("select * from record").show(truncate = false) //i was expecting to be able to use the record table to read the JSON string, but the table is empty for the first call. And i do not see any dataframe output after the first one *But yeah the above steps work good and i can do things that i need to, in spark-shell, the problem is when i try to code in Intellij, because the streaming query keeps running and i am not sure how to identify and stop the streaming query and use record memory table.* So i would like to stop the streaming query once i know i have some data in my record memory table(is there a way to do that), so i can stop the streaming query and use the memory table, fetch my record. Any help on how to approach the situation programmatically/any examples pointed would highly be appreciated. Regards, Satyajit. On Sun, Dec 10, 2017 at 9:52 PM, Jacek Laskowski <ja...@japila.pl> wrote: > Hi, > > What about memory sink? That could work. > > Pozdrawiam, > Jacek Laskowski > > https://about.me/JacekLaskowski > Spark Structured Streaming https://bit.ly/spark-structured-streaming > Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark > Follow me at https://twitter.com/jaceklaskowski > > On Mon, Dec 11, 2017 at 3:28 AM, satyajit vegesna < > satyajit.apas...@gmail.com> wrote: > >> Hi All, >> >> I would like to infer JSON schema from a sample of data that i receive >> from, Kafka Streams(specific topic), and i have to infer the schema as i am >> going to receive random JSON string with different schema for each topic, >> so i chose to go ahead with below steps, >> >> a. readStream from Kafka(latest offset), from a single Kafka topic. >> b. Some how to store the JSON string into val and infer the schema. >> c. stop the stream. >> d.Create new readStream(smallest offset) and use the above inferred >> schema to process the JSON using spark provided JSON support, like >> from_json, json_object and others and run my actuall business logic. >> >> Now i am not sure how to be successful with step(b). Any help would be >> appreciated. >> And would also like to know if there is any better approach. >> >> Regards, >> Satyajit. >> > >
Infer JSON schema in structured streaming Kafka.
Hi All, I would like to infer JSON schema from a sample of data that i receive from, Kafka Streams(specific topic), and i have to infer the schema as i am going to receive random JSON string with different schema for each topic, so i chose to go ahead with below steps, a. readStream from Kafka(latest offset), from a single Kafka topic. b. Some how to store the JSON string into val and infer the schema. c. stop the stream. d.Create new readStream(smallest offset) and use the above inferred schema to process the JSON using spark provided JSON support, like from_json, json_object and others and run my actuall business logic. Now i am not sure how to be successful with step(b). Any help would be appreciated. And would also like to know if there is any better approach. Regards, Satyajit.
RDD[internalRow] -> DataSet
Hi All, Is there a way to convert RDD[internalRow] to Dataset , from outside spark sql package. Regards, Satyajit.
Re: Json Parsing.
Thank you for the info, is there a way to get all keys of JSON, so that i can create a dataframe with json keys, as below, fieldsDataframe.withColumn("data" , functions.get_json_object($"RecordString", "$.id")) this is for appending a single column in dataframe with id key. I would like to automate this process for all keys in the JSON, as i am going to get dynamically generated JSON schema. On Wed, Dec 6, 2017 at 4:37 PM, ayan guha <guha.a...@gmail.com> wrote: > > On Thu, 7 Dec 2017 at 11:37 am, ayan guha <guha.a...@gmail.com> wrote: > >> You can use get_json function >> >> On Thu, 7 Dec 2017 at 10:39 am, satyajit vegesna < >> satyajit.apas...@gmail.com> wrote: >> >>> Does spark support automatic detection of schema from a json string in a >>> dataframe. >>> >>> I am trying to parse a json string and do some transofrmations on to it >>> (would like append new columns to the dataframe) , from the data i stream >>> from kafka. >>> >>> But i am not very sure, how i can parse the json in structured >>> streaming. And i would not be interested in creating a schema, as the data >>> form kafka is going to maintain different schema objects in value column. >>> >>> Any advice or help would be appreciated. >>> >>> Regards, >>> Satyajit. >>> >> -- >> Best Regards, >> Ayan Guha >> > -- > Best Regards, > Ayan Guha >
Json Parsing.
Does spark support automatic detection of schema from a json string in a dataframe. I am trying to parse a json string and do some transofrmations on to it (would like append new columns to the dataframe) , from the data i stream from kafka. But i am not very sure, how i can parse the json in structured streaming. And i would not be interested in creating a schema, as the data form kafka is going to maintain different schema objects in value column. Any advice or help would be appreciated. Regards, Satyajit.
Re: Spark Project build Issues.(Intellij)
Hi , I was able to successfully build the project(source code), from intellij. But when i try to run any of the examples present in $SPARK_HOME/examples folder , i am getting different errors for different example jobs. example: for structuredkafkawordcount example, Exception in thread "main" java.lang.NoClassDefFoundError: scala/collection/Seq at org.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKafkaWordCount.scala) Caused by: java.lang.ClassNotFoundException: scala.collection.Seq at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 1 more for LogQuery job, objc[21879]: Class JavaLaunchHelper is implemented in both /Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/bin/java (0x106ff54c0) and /Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/libinstrument.dylib (0x1070bd4e0). One of the two will be used. Which one is undefined. Error: A JNI error has occurred, please check your installation and try again Exception in thread "main" java.lang.NoClassDefFoundError: scala/collection/immutable/List at java.lang.Class.getDeclaredMethods0(Native Method) at java.lang.Class.privateGetDeclaredMethods(Class.java:2701) at java.lang.Class.privateGetMethodRecursive(Class.java:3048) at java.lang.Class.getMethod0(Class.java:3018) at java.lang.Class.getMethod(Class.java:1784) at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:544) at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:526) Caused by: java.lang.ClassNotFoundException: scala.collection.immutable.List at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 7 more On Wed, Jun 28, 2017 at 5:21 PM, Dongjoon Hyun <dongjoon.h...@gmail.com> wrote: > Did you follow the guide in `IDE Setup` -> `IntelliJ` section of > http://spark.apache.org/developer-tools.html ? > > Bests, > Dongjoon. > > On Wed, Jun 28, 2017 at 5:13 PM, satyajit vegesna < > satyajit.apas...@gmail.com> wrote: > >> Hi All, >> >> When i try to build source code of apache spark code from >> https://github.com/apache/spark.git, i am getting below errors, >> >> Error:(9, 14) EventBatch is already defined as object EventBatch >> public class EventBatch extends org.apache.avro.specific.SpecificRecordBase >> implements org.apache.avro.specific.SpecificRecord { >> Error:(9, 14) EventBatch is already defined as class EventBatch >> public class EventBatch extends org.apache.avro.specific.SpecificRecordBase >> implements org.apache.avro.specific.SpecificRecord { >> /Users/svegesna/svegesna/dev/scala/spark/external/flume-sink >> /target/scala-2.11/src_managed/main/compiled_avro/org/ >> apache/spark/streaming/flume/sink/SparkFlumeProtocol.java >> Error:(26, 18) SparkFlumeProtocol is already defined as object >> SparkFlumeProtocol >> public interface SparkFlumeProtocol { >> Error:(26, 18) SparkFlumeProtocol is already defined as trait >> SparkFlumeProtocol >> public interface SparkFlumeProtocol { >> /Users/svegesna/svegesna/dev/scala/spark/external/flume-sink >> /target/scala-2.11/src_managed/main/compiled_avro/org/ >> apache/spark/streaming/flume/sink/SparkSinkEvent.java >> Error:(9, 14) SparkSinkEvent is already defined as object SparkSinkEvent >> public class SparkSinkEvent extends >> org.apache.avro.specific.SpecificRecordBase >> implements org.apache.avro.specific.SpecificRecord { >> Error:(9, 14) SparkSinkEvent is already defined as class SparkSinkEvent >> public class SparkSinkEvent extends >> org.apache.avro.specific.SpecificRecordBase >> implements org.apache.avro.specific.SpecificRecord { >> >> Would like to know , if i can successfully build the project, so that i >> can test and debug some of spark's functionalities. >> >> Regards, >> Satyajit. >> > >
Spark Project build Issues.(Intellij)
Hi All, When i try to build source code of apache spark code from https://github.com/apache/spark.git, i am getting below errors, Error:(9, 14) EventBatch is already defined as object EventBatch public class EventBatch extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { Error:(9, 14) EventBatch is already defined as class EventBatch public class EventBatch extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { /Users/svegesna/svegesna/dev/scala/spark/external/flume-sink/target/scala-2.11/src_managed/main/compiled_avro/org/apache/spark/streaming/flume/sink/SparkFlumeProtocol.java Error:(26, 18) SparkFlumeProtocol is already defined as object SparkFlumeProtocol public interface SparkFlumeProtocol { Error:(26, 18) SparkFlumeProtocol is already defined as trait SparkFlumeProtocol public interface SparkFlumeProtocol { /Users/svegesna/svegesna/dev/scala/spark/external/flume-sink/target/scala-2.11/src_managed/main/compiled_avro/org/apache/spark/streaming/flume/sink/SparkSinkEvent.java Error:(9, 14) SparkSinkEvent is already defined as object SparkSinkEvent public class SparkSinkEvent extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { Error:(9, 14) SparkSinkEvent is already defined as class SparkSinkEvent public class SparkSinkEvent extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { Would like to know , if i can successfully build the project, so that i can test and debug some of spark's functionalities. Regards, Satyajit.
Re: Building Kafka 0.10 Source for Structured Streaming Error.
Have updated the pom.xml in external/kafka-0-10-sql folder, in yellow , as below, and have run the command build/mvn package -DskipTests -pl external/kafka-0-10-sql which generated spark-sql-kafka-0-10_2.11-2.3.0-SNAPSHOT-jar-with-dependencies.jar http://maven.apache.org/POM/4.0.0; xmlns:xsi=" http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation=" http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> 4.0.0 org.apache.spark spark-parent_2.11 2.3.0-SNAPSHOT ../../pom.xml org.apache.spark spark-sql-kafka-0-10_2.11 sql-kafka-0-10 jar Kafka 0.10 Source for Structured Streaming http://spark.apache.org/ org.apache.spark spark-sql_${scala.binary.version} ${project.version} provided org.apache.spark spark-core_${scala.binary.version} ${project.version} test-jar test org.apache.spark spark-catalyst_${scala.binary.version} ${project.version} test-jar test org.apache.spark spark-sql_${scala.binary.version} ${project.version} test-jar test org.apache.kafka kafka-clients 0.10.0.1 org.apache.kafka kafka_${scala.binary.version} 0.10.0.1 net.sf.jopt-simple jopt-simple 3.2 test org.scalacheck scalacheck_${scala.binary.version} test org.apache.spark spark-tags_${scala.binary.version} org.apache.spark spark-tags_${scala.binary.version} test-jar test target/scala-${scala.binary.version}/classes target/scala-${scala.binary.version}/test-classes maven-assembly-plugin 3.0.0 jar-with-dependencies make-assembly package single Regards, Satyajit. On Wed, Jun 28, 2017 at 12:12 PM, Shixiong(Ryan) Zhu < shixi...@databricks.com> wrote: > "--package" will add transitive dependencies that are not > "$SPARK_HOME/external/kafka-0-10-sql/target/*.jar". > > > i have tried building the jar with dependencies, but still face the same > error. > > What's the command you used? > > On Wed, Jun 28, 2017 at 12:00 PM, satyajit vegesna < > satyajit.apas...@gmail.com> wrote: > >> Hi All, >> >> I am trying too build Kafka-0-10-sql module under external folder in >> apache spark source code. >> Once i generate jar file using, >> build/mvn package -DskipTests -pl external/kafka-0-10-sql >> i get jar file created under external/kafka-0-10-sql/target. >> >> And try to run spark-shell with jars created in target folder as below, >> bin/spark-shell --jars $SPARK_HOME/external/kafka-0-10-sql/target/*.jar >> >> i get below error based on the command, >> >> Using Spark's default log4j profile: org/apache/spark/log4j-default >> s.properties >> >> Setting default log level to "WARN". >> >> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use >> setLogLevel(newLevel). >> >> 17/06/28 11:54:03 WARN NativeCodeLoader: Unable to load native-hadoop >> library for your platform... using builtin-java classes where applicable >> >> Spark context Web UI available at http://10.1.10.241:4040 >> >> Spark context available as 'sc' (master = local[*], app id = >> local-1498676043936). >> >> Spark session available as 'spark'. >> >> Welcome to >> >> __ >> >> / __/__ ___ _/ /__ >> >> _\ \/ _ \/ _ `/ __/ '_/ >> >>/___/ .__/\_,_/_/ /_/\_\ version 2.3.0-SNAPSHOT >> >> /_/ >> >> >> >> Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java >> 1.8.0_131) >> >> Type in expressions to have them evaluated. >> >> Type :help for more information. >> >> scala> val lines = spark.readStream.format("kafka >> ").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", >> "test").load() >> >> java.lang.NoClassDefFoundError: org/apache/kafka/common/serial >> ization/ByteArrayDeserializer >> >> at org.apache.spark.sql.kafka010.KafkaSourceProvider$.(Ka >> fkaSourceProvider.scala:378) >> >> at o
Building Kafka 0.10 Source for Structured Streaming Error.
Hi All, I am trying too build Kafka-0-10-sql module under external folder in apache spark source code. Once i generate jar file using, build/mvn package -DskipTests -pl external/kafka-0-10-sql i get jar file created under external/kafka-0-10-sql/target. And try to run spark-shell with jars created in target folder as below, bin/spark-shell --jars $SPARK_HOME/external/kafka-0-10-sql/target/*.jar i get below error based on the command, Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 17/06/28 11:54:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Spark context Web UI available at http://10.1.10.241:4040 Spark context available as 'sc' (master = local[*], app id = local-1498676043936). Spark session available as 'spark'. Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.3.0-SNAPSHOT /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_131) Type in expressions to have them evaluated. Type :help for more information. scala> val lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "test").load() java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer at org.apache.spark.sql.kafka010.KafkaSourceProvider$.(KafkaSourceProvider.scala:378) at org.apache.spark.sql.kafka010.KafkaSourceProvider$.(KafkaSourceProvider.scala) at org.apache.spark.sql.kafka010.KafkaSourceProvider.validateStreamOptions(KafkaSourceProvider.scala:325) at org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:60) at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:192) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:87) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:87) at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:150) ... 48 elided Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArrayDeserializer at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 57 more ++ i have tried building the jar with dependencies, but still face the same error. But when i try to do --package with spark-shell using bin/spark-shell --package org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 , it works fine. The reason, i am trying to build something from source code, is because i want to try pushing dataframe data into kafka topic, based on the url https://github.com/apache/spark/commit/b0a5cd89097c563e9949d8cfcf84d18b03b8d24c, which doesn't work with version 2.1.0. Any help would be highly appreciated. Regards, Satyajit.
Null pointer exception with RDD while computing a method, creating dataframe.
Hi All, PFB sample code , val df = spark.read.parquet() df.registerTempTable("df") val zip = df.select("zip_code").distinct().as[String].rdd def comp(zipcode:String):Unit={ val zipval = "SELECT * FROM df WHERE zip_code='$zipvalrepl'".replace("$zipvalrepl", zipcode) val data = spark.sql(zipval) //Throwing null pointer exception with RDD data.write.parquet(..) } val sam = zip.map(x => comp(x)) sam.count But when i do val zip = df.select("zip_code").distinct().as[String].rdd.collect and call the function, then i get data computer, but in sequential order. I would like to know, why when tried running map with rdd, i get null pointer exception and is there a way to compute the comp function for each zipcode in parallel ie run multiple zipcode at the same time. Any clue or inputs are appreciated. Regards.
[no subject]
Hi All, PFB sample code , val df = spark.read.parquet() df.registerTempTable("df") val zip = df.select("zip_code").distinct().as[String].rdd def comp(zipcode:String):Unit={ val zipval = "SELECT * FROM df WHERE zip_code='$zipvalrepl'".replace("$zipvalrepl", zipcode) val data = spark.sql(zipval) //Throwing null pointer exception with RDD data.write.parquet(..) } val sam = zip.map(x => comp(x)) sam.count But when i do val zip = df.select("zip_code").distinct().as[String].rdd.collect and call the function, then i get data computer, but in sequential order. I would like to know, why when tried running map with rdd, i get null pointer exception and is there a way to compute the comp function for each zipcode in parallel ie run multiple zipcode at the same time. Any clue or inputs are appreciated. Regards.
Document Similarity -Spark Mllib
Hi ALL, I am trying to implement a mlllib spark job, to find the similarity between documents(for my case is basically home addess). i believe i cannot use DIMSUM for my use case as, DIMSUM is works well only with matrix with thin columns and more rows in matrix. matrix example format, for my use case: doc1(address1) doc2(address2) .. m is going to be huge as i have more add. san mateo 0.73462 0 san fransico .. .. san bruno .... . . . . and n is going to be thin compared to m I would like to know if there is way to leverage DIMSUM to work on my use case, and if not what other alogrithm i can try that is available in spark mlllib. Regards, Satyajit.
Issue in using DenseVector in RowMatrix, error could be due to ml and mllib package changes
Hi All, PFB code. import org.apache.spark.ml.feature.{HashingTF, IDF} import org.apache.spark.ml.linalg.SparseVector import org.apache.spark.mllib.linalg.distributed.RowMatrix import org.apache.spark.sql.SparkSession import org.apache.spark.{SparkConf, SparkContext} /** * Created by satyajit on 12/7/16. */ object DIMSUMusingtf extends App { val conf = new SparkConf() .setMaster("local[1]") .setAppName("testColsim") val sc = new SparkContext(conf) val spark = SparkSession .builder .appName("testColSim").getOrCreate() import org.apache.spark.ml.feature.Tokenizer val sentenceData = spark.createDataFrame(Seq( (0, "Hi I heard about Spark"), (0, "I wish Java could use case classes"), (1, "Logistic regression models are neat") )).toDF("label", "sentence") val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words") val wordsData = tokenizer.transform(sentenceData) val hashingTF = new HashingTF() .setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(20) val featurizedData = hashingTF.transform(wordsData) val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features") val idfModel = idf.fit(featurizedData) val rescaledData = idfModel.transform(featurizedData) rescaledData.show() rescaledData.select("features", "label").take(3).foreach(println) val check = rescaledData.select("features") val row = check.rdd.map(row => row.getAs[SparseVector]("features")) val mat = new RowMatrix(row) //i am basically trying to use Dense.vector as a direct input to rowMatrix, but i get an error that RowMatrix Cannot resolve constructor row.foreach(println) } Any help would be appreciated. Regards, Satyajit.
Re: Issues in compiling spark 2.0.0 code using scala-maven-plugin
> > > i am trying to compile code using maven ,which was working with spark > 1.6.2, but when i try for spark 2.0.0 then i get below error, > > org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute > goal net.alchim31.maven:scala-maven-plugin:3.2.2:compile (default) on > project NginxLoads-repartition: wrap: > org.apache.commons.exec.ExecuteException: > Process exited with an error: 1 (Exit value: 1) > at org.apache.maven.lifecycle.internal.MojoExecutor.execute( > MojoExecutor.java:212) > at org.apache.maven.lifecycle.internal.MojoExecutor.execute( > MojoExecutor.java:153) > at org.apache.maven.lifecycle.internal.MojoExecutor.execute( > MojoExecutor.java:145) > at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder. > buildProject(LifecycleModuleBuilder.java:116) > at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder. > buildProject(LifecycleModuleBuilder.java:80) > at org.apache.maven.lifecycle.internal.builder.singlethreaded. > SingleThreadedBuilder.build(SingleThreadedBuilder.java:51) > at org.apache.maven.lifecycle.internal.LifecycleStarter. > execute(LifecycleStarter.java:128) > at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:307) > at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:193) > at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:106) > at org.apache.maven.cli.MavenCli.execute(MavenCli.java:863) > at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:288) > at org.apache.maven.cli.MavenCli.main(MavenCli.java:199) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke( > NativeMethodAccessorImpl.java:57) > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at org.codehaus.plexus.classworlds.launcher.Launcher. > launchEnhanced(Launcher.java:289) > at org.codehaus.plexus.classworlds.launcher.Launcher. > launch(Launcher.java:229) > at org.codehaus.plexus.classworlds.launcher.Launcher. > mainWithExitCode(Launcher.java:415) > at org.codehaus.plexus.classworlds.launcher.Launcher. > main(Launcher.java:356) > Caused by: org.apache.maven.plugin.MojoExecutionException: wrap: > org.apache.commons.exec.ExecuteException: Process exited with an error: 1 > (Exit value: 1) > at scala_maven.ScalaMojoSupport.execute(ScalaMojoSupport.java:490) > at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo( > DefaultBuildPluginManager.java:134) > at org.apache.maven.lifecycle.internal.MojoExecutor.execute( > MojoExecutor.java:207) > ... 20 more > Caused by: org.apache.commons.exec.ExecuteException: Process exited with > an error: 1 (Exit value: 1) > at org.apache.commons.exec.DefaultExecutor.executeInternal( > DefaultExecutor.java:377) > at org.apache.commons.exec.DefaultExecutor.execute( > DefaultExecutor.java:160) > at org.apache.commons.exec.DefaultExecutor.execute( > DefaultExecutor.java:147) > at scala_maven_executions.JavaMainCallerByFork.run( > JavaMainCallerByFork.java:100) > at scala_maven.ScalaCompilerSupport.compile(ScalaCompilerSupport.java:161) > at scala_maven.ScalaCompilerSupport.doExecute( > ScalaCompilerSupport.java:99) > at scala_maven.ScalaMojoSupport.execute(ScalaMojoSupport.java:482) > ... 22 more > > > PFB pom.xml that i am using, any help would be appreciated. > > > http://maven.apache.org/POM/4.0.0; > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; > xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 > http://maven.apache.org/xsd/maven-4.0.0.xsd;> > 4.0.0 > > NginxLoads-repartition > NginxLoads-repartition > 1.1-SNAPSHOT > ${project.artifactId} > This is a boilerplate maven project to start using Spark in > Scala > 2010 > > > 1.6 > 1.6 > UTF-8 > 2.11 > 2.11 > > 2.11.8 > > > > > > cloudera-repo-releases > https://repository.cloudera.com/artifactory/repo/ > > > > > src/main/scala > src/test/scala > > > > maven-assembly-plugin > > > package > > single > > > > > > jar-with-dependencies > > > > > org.apache.maven.plugins > maven-compiler-plugin > 3.5.1 > > 1.7 > 1.7 > > > > > net.alchim31.maven > scala-maven-plugin > 3.2.2 > > > > >
Issues in compiling spark 2.0.0 code using scala-maven-plugin
Hi ALL, i am trying to compile code using maven ,which was working with spark 1.6.2, but when i try for spark 2.0.0 then i get below error, org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal net.alchim31.maven:scala-maven-plugin:3.2.2:compile (default) on project NginxLoads-repartition: wrap: org.apache.commons.exec.ExecuteException: Process exited with an error: 1 (Exit value: 1) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:212) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:116) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:80) at org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build(SingleThreadedBuilder.java:51) at org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:128) at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:307) at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:193) at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:106) at org.apache.maven.cli.MavenCli.execute(MavenCli.java:863) at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:288) at org.apache.maven.cli.MavenCli.main(MavenCli.java:199) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced(Launcher.java:289) at org.codehaus.plexus.classworlds.launcher.Launcher.launch(Launcher.java:229) at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode(Launcher.java:415) at org.codehaus.plexus.classworlds.launcher.Launcher.main(Launcher.java:356) Caused by: org.apache.maven.plugin.MojoExecutionException: wrap: org.apache.commons.exec.ExecuteException: Process exited with an error: 1 (Exit value: 1) at scala_maven.ScalaMojoSupport.execute(ScalaMojoSupport.java:490) at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:134) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:207) ... 20 more Caused by: org.apache.commons.exec.ExecuteException: Process exited with an error: 1 (Exit value: 1) at org.apache.commons.exec.DefaultExecutor.executeInternal(DefaultExecutor.java:377) at org.apache.commons.exec.DefaultExecutor.execute(DefaultExecutor.java:160) at org.apache.commons.exec.DefaultExecutor.execute(DefaultExecutor.java:147) at scala_maven_executions.JavaMainCallerByFork.run(JavaMainCallerByFork.java:100) at scala_maven.ScalaCompilerSupport.compile(ScalaCompilerSupport.java:161) at scala_maven.ScalaCompilerSupport.doExecute(ScalaCompilerSupport.java:99) at scala_maven.ScalaMojoSupport.execute(ScalaMojoSupport.java:482) ... 22 more PFB pom.xml that i am using, any help would be appreciated. http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> 4.0.0 NginxLoads-repartition NginxLoads-repartition 1.1-SNAPSHOT ${project.artifactId} This is a boilerplate maven project to start using Spark in Scala 2010 1.6 1.6 UTF-8 2.11 2.11 2.11.8 cloudera-repo-releases https://repository.cloudera.com/artifactory/repo/ src/main/scala src/test/scala maven-assembly-plugin package single jar-with-dependencies org.apache.maven.plugins maven-compiler-plugin 3.5.1 1.7 1.7 net.alchim31.maven scala-maven-plugin 3.2.2 compile testCompile -make:transitive
Spark on yarn, only 1 or 2 vcores getting allocated to the containers getting created.
Hi All, I am trying to run a spark job using yarn, and i specify --executor-cores value as 20. But when i go check the "nodes of the cluster" page in http://hostname:8088/cluster/nodes then i see 4 containers getting created on each of the node in cluster. But can only see 1 vcore getting assigned for each containier, even when i specify --executor-cores 20 while submitting job using spark-submit. yarn-site.xml yarn.scheduler.maximum-allocation-mb 6 yarn.scheduler.minimum-allocation-vcores 1 yarn.scheduler.maximum-allocation-vcores 40 yarn.nodemanager.resource.memory-mb 7 yarn.nodemanager.resource.cpu-vcores 20 Did anyone face the same issue?? Regards, Satyajit.
HiveContext , difficulties in accessing tables in hive schema's/database's other than default database.
Hi All, I have been trying to access tables from other schema's , apart from default , to pull data into dataframe. i was successful in doing it using the default schema in hive database. But when i try any other schema/database in hive, i am getting below error.(Have also not seen any examples related to accessing tables in other schema/Database apart from default). 16/07/19 18:16:06 INFO hive.metastore: Connected to metastore. 16/07/19 18:16:08 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 472.3 KB, free 472.3 KB) 16/07/19 18:16:08 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 39.6 KB, free 511.9 KB) 16/07/19 18:16:08 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:41434 (size: 39.6 KB, free: 2.4 GB) 16/07/19 18:16:08 INFO spark.SparkContext: Created broadcast 0 from show at sparkHive.scala:70 Exception in thread "main" java.lang.NoSuchMethodError: org.apache.hadoop.hive.ql.exec.Utilities.copyTableJobPropertiesToConf(Lorg/apache/hadoop/hive/ql/plan/TableDesc;Lorg/apache/hadoop/mapred/JobConf;)V at org.apache.spark.sql.hive.HadoopTableReader$.initializeLocalJobConfFunc(TableReader.scala:324) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$12.apply(TableReader.scala:276) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$12.apply(TableReader.scala:276) at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:176) at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:176) at scala.Option.map(Option.scala:145) at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:176) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:195) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:190) at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165) at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174) at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538) at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2125) at org.apache.spark.sql.DataFrame.org $apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1537) at org.apache.spark.sql.DataFrame.org $apache$spark$sql$DataFrame$$collect(DataFrame.scala:1544) at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1414) at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1413) at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2138) at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1413) at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1495) at
Fwd: Master options Cluster/Client descrepencies.
Hi All, I have written a spark program on my dev box , IDE:Intellij scala version:2.11.7 spark verison:1.6.1 run fine from IDE, by providing proper input and output paths including master. But when i try to deploy the code in my cluster made of below, Spark version:1.6.1 built from source pkg using scala 2.11 But when i try spark-shell on cluster i get scala version to be 2.10.5 hadoop yarn cluster 2.6.0 and with additional options, --executor-memory --total-executor-cores --deploy-mode cluster/client --master yarn i get Exception in thread "main" java.lang.NoSuchMethodError: scala.Predef$.$conforms()Lscala/Predef$$less$colon$less; at com.movoto.SparkPost$.main(SparkPost.scala:36) at com.movoto.SparkPost.main(SparkPost.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) i understand this to be a scala version issue, as i have faced this before. Is there something that i have change and try things to get the same program running on cluster. Regards, Satyajit.
Master options Cluster/Client descrepencies.
Hi All, I have written a spark program on my dev box , IDE:Intellij scala version:2.11.7 spark verison:1.6.1 run fine from IDE, by providing proper input and output paths including master. But when i try to deploy the code in my cluster made of below, Spark version:1.6.1 built from source pkg using scala 2.11 But when i try spark-shell on cluster i get scala version to be 2.10.5 hadoop yarn cluster 2.6.0 and with additional options, --executor-memory --total-executor-cores --deploy-mode cluster/client --master yarn i get Exception in thread "main" java.lang.NoSuchMethodError: scala.Predef$.$conforms()Lscala/Predef$$less$colon$less; at com.movoto.SparkPost$.main(SparkPost.scala:36) at com.movoto.SparkPost.main(SparkPost.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) i understand this to be a scala version issue, as i have faced this before. Is there something that i have change and try things to get the same program running on cluster. Regards, Satyajit.
Fwd: Apache Spark Exception in thread “main” java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class
Hi, Scala version:2.11.7(had to upgrade the scala verison to enable case clasess to accept more than 22 parameters.) Spark version:1.6.1. PFB pom.xml Getting below error when trying to setup spark on intellij IDE, 16/03/16 18:36:44 INFO spark.SparkContext: Running Spark version 1.6.1 Exception in thread "main" java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class at org.apache.spark.util.TimeStampedWeakValueHashMap.(TimeStampedWeakValueHashMap.scala:42) at org.apache.spark.SparkContext.(SparkContext.scala:298) at com.examples.testSparkPost$.main(testSparkPost.scala:27) at com.examples.testSparkPost.main(testSparkPost.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Caused by: java.lang.ClassNotFoundException: scala.collection.GenTraversableOnce$class at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 9 more pom.xml: http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> 4.0.0 StreamProcess StreamProcess 0.0.1-SNAPSHOT ${project.artifactId} This is a boilerplate maven project to start using Spark in Scala 2010 1.6 1.6 UTF-8 2.10 2.11.7 cloudera-repo-releases https://repository.cloudera.com/artifactory/repo/ src/main/scala src/test/scala maven-assembly-plugin package single jar-with-dependencies net.alchim31.maven scala-maven-plugin 3.2.2 compile testCompile -dependencyfile ${project.build.directory}/.scala_dependencies maven-assembly-plugin 2.4.1 jar-with-dependencies make-assembly package single org.scala-lang scala-library ${scala.version} org.mongodb.mongo-hadoop mongo-hadoop-core 1.4.2 javax.servlet servlet-api org.mongodb mongodb-driver 3.2.2 javax.servlet servlet-api org.mongodb mongodb-driver 3.2.2 javax.servlet servlet-api org.apache.spark spark-streaming_2.10 1.6.1 org.apache.spark spark-core_2.10 1.6.1 org.apache.spark spark-sql_2.10 1.6.1 org.apache.hadoop hadoop-hdfs 2.6.0 org.apache.hadoop hadoop-auth 2.6.0 org.apache.hadoop
Fwd: DF creation
Hi , I am trying to create separate val reference to object DATA (as shown below), case class data(name:String,age:String) Creation of this object is done separately and the reference to the object is stored into val data. i use val samplerdd = sc.parallelize(Seq(data)) , to create RDD. org.apache.spark.rdd.RDD[data] = ParallelCollectionRDD[10] at parallelize at :24 is there a way to create dataframe out of this, without using createDataFrame, and by using toDF() which i was unable to convert.(would like to avoid providing the structtype). Regards, Satyajit.
Data not getting printed in Spark Streaming with print().
HI All, I am trying to run HdfsWordCount example from github. https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala i am using ubuntu to run the program, but dont see any data getting printed after , --- Time: 145402680 ms --- I dont see any errors, the program just runs, but i do not see any output of the data corresponding to the file used. object HdfsStream { def main(args:Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("SpoolDirSpark").setMaster("local[5]") val ssc = new StreamingContext(sparkConf, Minutes(10)) //val inputDirectory = "hdfs://localhost:9000/SpoolDirSpark" //val inputDirectory = "hdfs://localhost:9000/SpoolDirSpark/test.txt" val inputDirectory = "file:///home/satyajit/jsondata/" val lines = ssc.fileStream[LongWritable,Text,TextInputFormat](inputDirectory).map{case(x,y)=> (x.toString,y.toString)} //lines.saveAsTextFiles("hdfs://localhost:9000/SpoolDirSpark/datacheck") lines.saveAsTextFiles("file:///home/satyajit/jsondata/") println("check_data"+lines.print()) ssc.start() ssc.awaitTermination() Would like to know if there is any workaround, or if there is something i am missing. Thanking in advance, Satyajit.
Parquet SaveMode.Append Trouble.
Hi, I am new to using Spark and Parquet files, Below is what i am trying to do, on Spark-shell, val df = sqlContext.parquetFile(/data/LM/Parquet/Segment/pages/part-m-0.gz.parquet) Have also tried below command, val df=sqlContext.read.format(parquet).load(/data/LM/Parquet/Segment/pages/part-m-0.gz.parquet) Now i have an other existing parquet file to which i want to append this Parquet file data of df. so i use, df.save(/data/LM/Parquet/Segment/pages2/part-m-0.gz.parquet,parquet, SaveMode.Append ) also tried below command, df.save(/data/LM/Parquet/Segment/pages2/part-m-0.gz.parquet, SaveMode.Append ) and it throws me below error, console:26: error: not found: value SaveMode df.save(/data/LM/Parquet/Segment/pages2/part-m-0.gz.parquet,parquet, SaveMode.Append ) Please help me, in case i am doing something wrong here. Regards, Satyajit.