Re: Spark as Relational Database
Hi, It is very easy to integrate using Cassandra in a use case such as this. For instance, do your joins in Spark and do your data storage in Cassandra which allows a very flexible schema, unlike a relational DB, and is much faster, fault tolerant, and with spark and colocation WRT data locality, infinitely faster. If you use the Spark Cassandra Connector, reading and writing to Cassandra is as simple as: write - DStream or RDD stream.map(RawData(_)).saveToCassandra(keyspace, table) read - SparkContext or StreamingContext ssc.cassandraTable[Double](keyspace, dailytable) .select(precipitation) .where(weather_station = ? AND year = ?, wsid, year) .map(doWork) In your build: com.datastax.spark %% spark-cassandra-connector % 1.1.0-alpha4”// our 1.1.0 is for spark 1.1 https://github.com/datastax/spark-cassandra-connector docs: https://github.com/datastax/spark-cassandra-connector/tree/master/doc - Helena twitter: @helenaedelson On Oct 26, 2014, at 10:05 AM, Rick Richardson rick.richard...@gmail.com wrote: Spark's API definitely covers all of the things that a relational database can do. It will probably outperform a relational star schema if all of your *working* data set can fit into RAM on your cluster. It will still perform quite well if most of the data fits and some has to spill over to disk. What are your requirements exactly? What is massive amounts of data exactly? How big is your cluster? Note that Spark is not for data storage, only data analysis. It pulls data into working data sets called RDD's. As a migration path, you could probably pull the data out of a relational database to analyze. But in the long run, I would recommend using a more purpose built, huge storage database such as Cassandra. If your data is very static, you could also just store it in files. On Oct 26, 2014 9:19 AM, Peter Wolf opus...@gmail.com wrote: My understanding is the SparkSQL allows one to access Spark data as if it were stored in a relational database. It compiles SQL queries into a series of calls to the Spark API. I need the performance of a SQL database, but I don't care about doing queries with SQL. I create the input to MLib by doing a massive JOIN query. So, I am creating a single collection by combining many collections. This sort of operation is very inefficient in Mongo, Cassandra or HDFS. I could store my data in a relational database, and copy the query results to Spark for processing. However, I was hoping I could keep everything in Spark. On Sat, Oct 25, 2014 at 11:34 PM, Soumya Simanta soumya.sima...@gmail.com wrote: 1. What data store do you want to store your data in ? HDFS, HBase, Cassandra, S3 or something else? 2. Have you looked at SparkSQL (https://spark.apache.org/sql/)? One option is to process the data in Spark and then store it in the relational database of your choice. On Sat, Oct 25, 2014 at 11:18 PM, Peter Wolf opus...@gmail.com wrote: Hello all, We are considering Spark for our organization. It is obviously a superb platform for processing massive amounts of data... how about retrieving it? We are currently storing our data in a relational database in a star schema. Retrieving our data requires doing many complicated joins across many tables. Can we use Spark as a relational database? Or, if not, can we put Spark on top of a relational database? Note that we don't care about SQL. Accessing our data via standard queries is nice, but we are equally happy (or more happy) to write Scala code. What is important to us is doing relational queries on huge amounts of data. Is Spark good at this? Thank you very much in advance Peter
Re: NoSuchMethodError: cassandra.thrift.ITransportFactory.openTransport()
Hi Sasi, Thrift is not needed to integrate Cassandra with Spark. In fact the only dep you need is spark-cassandra-connector_2.10-1.1.0-alpha3.jar, and you can upgrade to alpha4; we’re publishing beta very soon. For future reference, questions/tickets can be created here:https://github.com/datastax/spark-cassandra-connector/issues. Thanks, - Helena @helenaedelson On Oct 27, 2014, at 6:00 AM, Sasi sasikumar@gmail.com wrote: Apache Spark Team, We recently started experimenting with Apache Spark for high speed data retrieval. We downloaded Apache Spark Source Code; Installed Git; Packaged the assebly; Installed Scala and ran some examples mentioned in the documentation. We did all these steps on WindowsXP. Till this point, every thing seems to be fine. After that, we tried to connect Apache Spark with Cassandra. Downloaded Cassandra from DataStax and installed it. Developed small sample code for connecting to cassandra using https://github.com/datastax/spark-cassandra-connector/blob/b1.0/doc/0_quick_start.md link. During spark-submit, we faced some JARs related issue and we resolved them using --jars option for spark-submit. However, we stuck with NoSuchMethodError: cassandra.thrift.ITransportFactory.openTransport() Find enclosed image for the complete error. http://apache-spark-user-list.1001560.n3.nabble.com/file/n17338/Error.png We included following JARS using --jars option for spark-submit a) apache-cassandra-thrift-1.1.10.jar b) libthrift-0.9.0.jar c) spark-cassandra-connector_2.10-1.1.0-alpha3.jar d) cassandra-all-2.1.0.jar e) cassandra-clientutil-2.1.0.jar f) cassandra-driver-core-2.1.0.jar Are we missed any JAR file? or Is it the right way to connect Spark with Cassandra? Any guidance would be appreciated. Sasi -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-cassandra-thrift-ITransportFactory-openTransport-tp17338.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Including jars in Spark-shell vs Spark-submit
Hi Harold, It seems like, based on your previous post, you are using one version of the connector as a dependency yet building the assembly jar from master? You were using 1.1.0-alpha3 (you can upgrade to alpha4, beta coming this week) yet your assembly is spark-cassandra-connector-assembly-1.2.0-SNAPSHOT. libraryDependencies ++= Seq( org.apache.spark % spark-streaming_2.10 % 1.1.0, com.datastax.spark %% spark-cassandra-connector % 1.1.0-alpha3 withSources() withJavadoc(), org.apache.spark %% spark-sql % 1.1.0 ) - Helena On Oct 28, 2014, at 2:08 PM, Harold Nguyen har...@nexgate.com wrote: Hi all, The following works fine when submitting dependency jars through Spark-Shell: ./bin/spark-shell --master spark://ip-172-31-38-112:7077 --jars /home/ubuntu/spark-cassandra-connector/spark-cassandra-connector/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector-assembly-1.2.0-SNAPSHOT.jar But not through spark-submit: ./bin/spark-submit --class org.apache.spark.examples.streaming.CassandraSave --master spark://ip-172-31-38-112:7077 streaming-test/target/scala-2.10/simple-streaming_2.10-1.0.jar --jars local:///home/ubuntu/spark-cassandra-connector/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector-assembly-1.2.0-SNAPSHOT.jar Am I issuing the spark-submit command incorrectly ? Each of the workers has that built jar in their respective directories (spark-cassandra-connector-assembly-1.2.0-SNAPSHOT.jar) Thanks, Harold - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Including jars in Spark-shell vs Spark-submit
Ah excellent! I will be sure to check if we need to update our documentation based on your feedback :) Cheers, Helena On Oct 28, 2014, at 3:03 PM, Harold Nguyen har...@nexgate.com wrote: Hi Helena, It's great to e-meet you! I've actually been following along your blogs and talks trying to get this to work. I just solved it, and you were absolutely correct. I've been using 1.1.0-alpha3 as my dependency, but my assembly is the 1.2.0-SNAPSHOT. Thanks for looking through all my other threads to piece together the problem. So the solution is that I just included the spark-cassandra-connector-assembly-1.2.0-SNAPSHOT straight into a lib folder, removed the line from 1.1.0-alpha3 line from libraryDepenencies, and re-packaged it. It all works great now, and I couldn't be happier. I had to piece together 6 different forums and sites to get that working (being absolutely new to Spark and Scala and sbt). I'll write a blog post on how to get this working later, in case it can help someone. I really appreciate the help! Harold On Tue, Oct 28, 2014 at 11:55 AM, Helena Edelson helena.edel...@datastax.com wrote: Hi Harold, It seems like, based on your previous post, you are using one version of the connector as a dependency yet building the assembly jar from master? You were using 1.1.0-alpha3 (you can upgrade to alpha4, beta coming this week) yet your assembly is spark-cassandra-connector-assembly-1.2.0-SNAPSHOT. libraryDependencies ++= Seq( org.apache.spark % spark-streaming_2.10 % 1.1.0, com.datastax.spark %% spark-cassandra-connector % 1.1.0-alpha3 withSources() withJavadoc(), org.apache.spark %% spark-sql % 1.1.0 ) - Helena On Oct 28, 2014, at 2:08 PM, Harold Nguyen har...@nexgate.com wrote: Hi all, The following works fine when submitting dependency jars through Spark-Shell: ./bin/spark-shell --master spark://ip-172-31-38-112:7077 --jars /home/ubuntu/spark-cassandra-connector/spark-cassandra-connector/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector-assembly-1.2.0-SNAPSHOT.jar But not through spark-submit: ./bin/spark-submit --class org.apache.spark.examples.streaming.CassandraSave --master spark://ip-172-31-38-112:7077 streaming-test/target/scala-2.10/simple-streaming_2.10-1.0.jar --jars local:///home/ubuntu/spark-cassandra-connector/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector-assembly-1.2.0-SNAPSHOT.jar Am I issuing the spark-submit command incorrectly ? Each of the workers has that built jar in their respective directories (spark-cassandra-connector-assembly-1.2.0-SNAPSHOT.jar) Thanks, Harold
Re: PySpark and Cassandra 2.1 Examples
Nice! - Helena @helenaedelson On Oct 29, 2014, at 12:01 PM, Mike Sukmanowsky mike.sukmanow...@gmail.com wrote: Hey all, Just thought I'd share this with the list in case any one else would benefit. Currently working on a proper integration of PySpark and DataStax's new Cassandra-Spark connector, but that's on going. In the meanwhile, I've basically updated the cassandra_inputformat.py and cassandra_outputformat.py examples that come with Spark. https://github.com/Parsely/pyspark-cassandra. The new example shows reading and writing to Cassandra including proper handling of CQL 3.1 collections: lists, sets and maps. Think it also clarifies the format RDDs are required be in to write data to Cassandra and provides a more general serializer to write Python (serialized via Py4J) structs to Cassandra. Comments or questions are welcome. Will update the group again when we have support for the DataStax connector. -- Mike Sukmanowsky Aspiring Digital Carpenter p: +1 (416) 953-4248 e: mike.sukmanow...@gmail.com facebook | twitter | LinkedIn | github
Re: Best way to partition RDD
Shahab, Regardless, WRT cassandra and spark when using the spark cassandra connector, ‘spark.cassandra.input.split.size’ passed into the SparkConf configures the approx number of Cassandra partitions in a Spark partition (default 10). No repartitioning should be necessary with what you have below, but I don’t know if you are running on one node or a cluster. This is a good initial guide: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md#configuration-options-for-adjusting-reads https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraRDD.scala#L26-L37 Cheers, Helena @helenaedelson On Oct 30, 2014, at 1:12 PM, Helena Edelson helena.edel...@datastax.com wrote: Hi Shahab, -How many spark/cassandra nodes are in your cluster? -What is your deploy topology for spark and cassandra clusters? Are they co-located? - Helena @helenaedelson On Oct 30, 2014, at 12:16 PM, shahab shahab.mok...@gmail.com wrote: Hi. I am running an application in the Spark which first loads data from Cassandra and then performs some map/reduce jobs. val srdd = sqlContext.sql(select * from mydb.mytable ) I noticed that the srdd only has one partition . no matter how big is the data loaded form Cassandra. So I perform repartition on the RDD , and then I did the map/reduce functions. But the main problem is that repartition takes so much time (almost 2 min), which is not acceptable in my use-case. Is there any better way to do repartitioning? best, /Shahab
Re: Manipulating RDDs within a DStream
Hi Harold, Can you include the versions of spark and spark-cassandra-connector you are using? Thanks! Helena @helenaedelson On Oct 30, 2014, at 12:58 PM, Harold Nguyen har...@nexgate.com wrote: Hi all, I'd like to be able to modify values in a DStream, and then send it off to an external source like Cassandra, but I keep getting Serialization errors and am not sure how to use the correct design pattern. I was wondering if you could help me. I'd like to be able to do the following: wordCounts.foreachRDD( rdd = { val arr = record.toArray ... }) I would like to use the arr to send back to cassandra, for instance: Use it like this: val collection = sc.parallelize(Seq(a.head._1, a.head_.2)) collection.saveToCassandra() Or something like that, but as you know, I can't do this within the foreacRDD but only at the driver level. How do I use the arr variable to do something like that ? Thanks for any help, Harold - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Accessing Cassandra with SparkSQL, Does not work?
Hi Shahab, I’m just curious, are you explicitly needing to use thrift? Just using the connector with spark does not require any thrift dependencies. Simply: com.datastax.spark %% spark-cassandra-connector % 1.1.0-beta1” But to your question, you declare the keyspace but also unnecessarily repeat the keyspace.table in your select. Try this instead: val cc = new CassandraSQLContext(sc) cc.setKeyspace(“keyspaceName) val result = cc.sql(SELECT * FROM tableName”) etc - Helena @helenaedelson On Oct 31, 2014, at 1:25 PM, shahab shahab.mok...@gmail.com wrote: Hi, I am using the latest Cassandra-Spark Connector to access Cassandra tables form Spark. While I successfully managed to connect Cassandra using CassandraRDD, the similar SparkSQL approach does not work. Here is my code for both methods: import com.datastax.spark.connector._ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql._; import org.apache.spark.SparkContext._ import org.apache.spark.sql.catalyst.expressions._ import com.datastax.spark.connector.cql.CassandraConnector import org.apache.spark.sql.cassandra.CassandraSQLContext val conf = new SparkConf().setAppName(SomethingElse) .setMaster(local) .set(spark.cassandra.connection.host, localhost) val sc: SparkContext = new SparkContext(conf) val rdd = sc.cassandraTable(mydb, mytable) // this works But: val cc = new CassandraSQLContext(sc) cc.setKeyspace(mydb) val srdd: SchemaRDD = cc.sql(select * from mydb.mytable ) println (count : + srdd.count) // does not work Exception is thrown: Exception in thread main com.google.common.util.concurrent.UncheckedExecutionException: java.util.NoSuchElementException: key not found: mydb3.inverseeventtype at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2201) at com.google.common.cache.LocalCache.get(LocalCache.java:3934) at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3938) in fact mydb3 is anothery keyspace which I did not tried even to connect to it ! Any idea? best, /Shahab Here is how my SBT looks like: libraryDependencies ++= Seq( com.datastax.spark %% spark-cassandra-connector % 1.1.0-beta1 withSources() withJavadoc(), org.apache.cassandra % cassandra-all % 2.0.9 intransitive(), org.apache.cassandra % cassandra-thrift % 2.0.9 intransitive(), net.jpountz.lz4 % lz4 % 1.2.0, org.apache.thrift % libthrift % 0.9.1 exclude(org.slf4j, slf4j-api) exclude(javax.servlet, servlet-api), com.datastax.cassandra % cassandra-driver-core % 2.0.4 intransitive(), org.apache.spark %% spark-core % 1.1.0 % provided exclude(org.apache.hadoop, hadoop-core), org.apache.spark %% spark-streaming % 1.1.0 % provided, org.apache.hadoop % hadoop-client % 1.0.4 % provided, com.github.nscala-time %% nscala-time % 1.0.0, org.scalatest %% scalatest % 1.9.1 % test, org.apache.spark %% spark-sql % 1.1.0 % provided, org.apache.spark %% spark-hive % 1.1.0 % provided, org.json4s %% json4s-jackson % 3.2.5, junit % junit % 4.8.1 % test, org.slf4j % slf4j-api % 1.7.7, org.slf4j % slf4j-simple % 1.7.7, org.clapper %% grizzled-slf4j % 1.0.2, log4j % log4j % 1.2.17)
Re: Manipulating RDDs within a DStream
Hi Harold, Yes, that is the problem :) Sorry for the confusion, I will make this clear in the docs ;) since master is work for the next version. All you need to do is use spark 1.1.0 as you have it already com.datastax.spark %% spark-cassandra-connector % 1.1.0-beta1” and assembly - not from master, checkout branch b1.1, and sbt ;clean ;reload ;assembly Cheers, - Helena @helenaedelson On Oct 31, 2014, at 1:35 PM, Harold Nguyen har...@nexgate.com wrote: Hi Helena, Thanks very much ! I'm using Spark 1.1.0, and spark-cassandra-connector-assembly-1.2.0-SNAPSHOT Best wishes, Harold On Fri, Oct 31, 2014 at 10:31 AM, Helena Edelson helena.edel...@datastax.com wrote: Hi Harold, Can you include the versions of spark and spark-cassandra-connector you are using? Thanks! Helena @helenaedelson On Oct 30, 2014, at 12:58 PM, Harold Nguyen har...@nexgate.com wrote: Hi all, I'd like to be able to modify values in a DStream, and then send it off to an external source like Cassandra, but I keep getting Serialization errors and am not sure how to use the correct design pattern. I was wondering if you could help me. I'd like to be able to do the following: wordCounts.foreachRDD( rdd = { val arr = record.toArray ... }) I would like to use the arr to send back to cassandra, for instance: Use it like this: val collection = sc.parallelize(Seq(a.head._1, a.head_.2)) collection.saveToCassandra() Or something like that, but as you know, I can't do this within the foreacRDD but only at the driver level. How do I use the arr variable to do something like that ? Thanks for any help, Harold
Re: Manipulating RDDs within a DStream
Hi Harold, This is a great use case, and here is how you could do it, for example, with Spark Streaming: Using a Kafka stream: https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L50 Save raw data to Cassandra from that stream https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L56 Do n-computations on that streaming data: reading from Kafka, computing in Spark, and writing to Cassandra https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L69-L71 I hope that helps, and if not I’ll dig up another. - Helena @helenaedelson On Oct 31, 2014, at 1:37 PM, Harold Nguyen har...@nexgate.com wrote: Thanks Lalit, and Helena, What I'd like to do is manipulate the values within a DStream like this: DStream.foreachRDD( rdd = { val arr = record.toArray } I'd then like to be able to insert results from the arr back into Cassadnra, after I've manipulated the arr array. However, for all the examples I've seen, inserting into Cassandra is something like: val collection = sc.parralellize(Seq(foo, bar))) Where foo and bar could be elements in the arr array. So I would like to know how to insert into Cassandra at the worker level. Best wishes, Harold On Thu, Oct 30, 2014 at 11:48 PM, lalit1303 la...@sigmoidanalytics.com wrote: Hi, Since, the cassandra object is not serializable you can't open the connection on driver level and access the object inside foreachRDD (i.e. at worker level). You have to open connection inside foreachRDD only, perform the operation and then close the connection. For example: wordCounts.foreachRDD( rdd = { val arr = rdd.toArray OPEN cassandra connection store arr CLOSE cassandra connection }) Thanks - Lalit Yadav la...@sigmoidanalytics.com -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Manipulating-RDDs-within-a-DStream-tp17740p17800.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Accessing Cassandra with SparkSQL, Does not work?
Hi Shahab, The apache cassandra version looks great. I think that doing cc.setKeyspace(mydb) cc.sql(SELECT * FROM mytable) versus cc.setKeyspace(mydb) cc.sql(select * from mydb.mytable ) Is the problem? And if not, would you mind creating a ticket off-list for us to help further? You can create one here: https://github.com/datastax/spark-cassandra-connector/issues with tag: help wanted :) Cheers, - Helena @helenaedelson On Oct 31, 2014, at 1:59 PM, shahab shahab.mok...@gmail.com wrote: Thanks Helena. I tried setting the KeySpace, but I got same result. I also removed other Cassandra dependencies, but still same exception! I also tried to see if this setting appears in the CassandraSQLContext or not, so I printed out the output of configustion val cc = new CassandraSQLContext(sc) cc.setKeyspace(mydb) cc.conf.getAll.foreach(f = println (f._1 + : + f._2)) printout: spark.tachyonStore.folderName : spark-ec8ecb6a-1485-4d39-a93c-6f91711804a2 spark.driver.host :192.168.1.111 spark.cassandra.connection.host : localhost spark.cassandra.input.split.size : 1 spark.app.name : SomethingElse spark.fileserver.uri : http://192.168.1.111:51463 spark.driver.port : 51461 spark.master : local Does it have anything to do with the version of Apache Cassandra that I use?? I use apache-cassandra-2.1.0 best, /Shahab The shortened SBT : com.datastax.spark %% spark-cassandra-connector % 1.1.0-beta1 withSources() withJavadoc(), net.jpountz.lz4 % lz4 % 1.2.0, org.apache.spark %% spark-core % 1.1.0 % provided exclude(org.apache.hadoop, hadoop-core), org.apache.spark %% spark-streaming % 1.1.0 % provided, org.apache.hadoop % hadoop-client % 1.0.4 % provided, com.github.nscala-time %% nscala-time % 1.0.0, org.scalatest %% scalatest % 1.9.1 % test, org.apache.spark %% spark-sql % 1.1.0 % provided, org.apache.spark %% spark-hive % 1.1.0 % provided, org.json4s %% json4s-jackson % 3.2.5, junit % junit % 4.8.1 % test, org.slf4j % slf4j-api % 1.7.7, org.slf4j % slf4j-simple % 1.7.7, org.clapper %% grizzled-slf4j % 1.0.2, log4j % log4j % 1.2.17 On Fri, Oct 31, 2014 at 6:42 PM, Helena Edelson helena.edel...@datastax.com wrote: Hi Shahab, I’m just curious, are you explicitly needing to use thrift? Just using the connector with spark does not require any thrift dependencies. Simply: com.datastax.spark %% spark-cassandra-connector % 1.1.0-beta1” But to your question, you declare the keyspace but also unnecessarily repeat the keyspace.table in your select. Try this instead: val cc = new CassandraSQLContext(sc) cc.setKeyspace(“keyspaceName) val result = cc.sql(SELECT * FROM tableName”) etc - Helena @helenaedelson On Oct 31, 2014, at 1:25 PM, shahab shahab.mok...@gmail.com wrote: Hi, I am using the latest Cassandra-Spark Connector to access Cassandra tables form Spark. While I successfully managed to connect Cassandra using CassandraRDD, the similar SparkSQL approach does not work. Here is my code for both methods: import com.datastax.spark.connector._ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql._; import org.apache.spark.SparkContext._ import org.apache.spark.sql.catalyst.expressions._ import com.datastax.spark.connector.cql.CassandraConnector import org.apache.spark.sql.cassandra.CassandraSQLContext val conf = new SparkConf().setAppName(SomethingElse) .setMaster(local) .set(spark.cassandra.connection.host, localhost) val sc: SparkContext = new SparkContext(conf) val rdd = sc.cassandraTable(mydb, mytable) // this works But: val cc = new CassandraSQLContext(sc) cc.setKeyspace(mydb) val srdd: SchemaRDD = cc.sql(select * from mydb.mytable ) println (count : + srdd.count) // does not work Exception is thrown: Exception in thread main com.google.common.util.concurrent.UncheckedExecutionException: java.util.NoSuchElementException: key not found: mydb3.inverseeventtype at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2201) at com.google.common.cache.LocalCache.get(LocalCache.java:3934) at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3938) in fact mydb3 is anothery keyspace which I did not tried even to connect to it ! Any idea? best, /Shahab Here is how my SBT looks like: libraryDependencies ++= Seq( com.datastax.spark %% spark-cassandra-connector % 1.1.0-beta1 withSources() withJavadoc(), org.apache.cassandra % cassandra-all % 2.0.9 intransitive(), org.apache.cassandra % cassandra-thrift % 2.0.9 intransitive(), net.jpountz.lz4 % lz4 % 1.2.0, org.apache.thrift % libthrift % 0.9.1 exclude(org.slf4j, slf4j-api) exclude(javax.servlet
Re: Cassandra spark connector exception: NoSuchMethodError: com.google.common.collect.Sets.newConcurrentHashSet()Ljava/util/Set;
Hi, It looks like you are building from master (spark-cassandra-connector-assembly-1.2.0). - Append this to your com.google.guava declaration: % provided - Be sure your version of the connector dependency is the same as the assembly build. For instance, if you are using 1.1.0-beta1, build your assembly with that vs master. - You can upgrade your version of cassandra if that is plausible for your deploy environment, to 2.1.0. Side note: we are releasing 1.1.0-beta2 today or tomorrow which allows usage of Cassandra 2.1.1 and fixes any guava issues - Make your version of cassandra server + dependencies match your cassandra driver version. You currently have 2.0.9 with 2.0.4 - Helena @helenaedelson On Nov 11, 2014, at 6:13 AM, shahab shahab.mok...@gmail.com wrote: Hi, I have a spark application which uses Cassandra connectorspark-cassandra-connector-assembly-1.2.0-SNAPSHOT.jar to load data from Cassandra into spark. Everything works fine in the local mode, when I run in my IDE. But when I submit the application to be executed in standalone Spark server, I get the following exception, (which is apparently related to Guava versions ???!). Does any one know how to solve this? I create a jar file of my spark application using assembly.bat, and the followings is the dependencies I used: I put the connectorspark-cassandra-connector-assembly-1.2.0-SNAPSHOT.ja in the lib/ folder of my eclipse project thats why it is not included in the dependencies libraryDependencies ++= Seq( org.apache.spark%% spark-catalyst% 1.1.0 % provided, org.apache.cassandra % cassandra-all % 2.0.9 intransitive(), org.apache.cassandra % cassandra-thrift % 2.0.9 intransitive(), net.jpountz.lz4 % lz4 % 1.2.0, org.apache.thrift % libthrift % 0.9.1 exclude(org.slf4j, slf4j-api) exclude(javax.servlet, servlet-api), com.datastax.cassandra % cassandra-driver-core % 2.0.4 intransitive(), org.apache.spark %% spark-core % 1.1.0 % provided exclude(org.apache.hadoop, hadoop-core), org.apache.spark %% spark-streaming % 1.1.0 % provided, org.apache.hadoop % hadoop-client % 1.0.4 % provided, com.github.nscala-time %% nscala-time % 1.0.0, org.scalatest %% scalatest % 1.9.1 % test, org.apache.spark %% spark-sql % 1.1.0 % provided, org.apache.spark %% spark-hive % 1.1.0 % provided, org.json4s %% json4s-jackson % 3.2.5, junit % junit % 4.8.1 % test, org.slf4j % slf4j-api % 1.7.7, org.slf4j % slf4j-simple % 1.7.7, org.clapper %% grizzled-slf4j % 1.0.2, log4j % log4j % 1.2.17, com.google.guava % guava % 16.0 ) best, /Shahab And this is the exception I get: Exception in thread main com.google.common.util.concurrent.ExecutionError: java.lang.NoSuchMethodError: com.google.common.collect.Sets.newConcurrentHashSet()Ljava/util/Set; at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2261) at com.google.common.cache.LocalCache.get(LocalCache.java:4000) at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4004) at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874) at org.apache.spark.sql.cassandra.CassandraCatalog.lookupRelation(CassandraCatalog.scala:39) at org.apache.spark.sql.cassandra.CassandraSQLContext$$anon$2.org$apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(CassandraSQLContext.scala:60) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:123) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:123) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:123) at org.apache.spark.sql.cassandra.CassandraSQLContext$$anon$2.lookupRelation(CassandraSQLContext.scala:65)
Re: Spark streaming cannot receive any message from Kafka
I encounter no issues with streaming from kafka to spark in 1.1.0. Do you perhaps have a version conflict? Helena On Nov 13, 2014 12:55 AM, Jay Vyas jayunit100.apa...@gmail.com wrote: Yup , very important that n1 for spark streaming jobs, If local use local[2] The thing to remember is that your spark receiver will take a thread to itself and produce data , so u need another thread to consume it . In a cluster manager like yarn or mesos, the word thread Is not used anymore, I guess has different meaning- you need 2 or more free compute slots, and that should be guaranteed by looking to see how many free node managers are running etc. On Nov 12, 2014, at 7:53 PM, Shao, Saisai saisai.s...@intel.com wrote: Did you configure Spark master as local, it should be local[n], n 1 for local mode. Beside there’s a Kafka wordcount example in Spark Streaming example, you can try that. I’ve tested with latest master, it’s OK. Thanks Jerry *From:* Tobias Pfeiffer [mailto:t...@preferred.jp t...@preferred.jp] *Sent:* Thursday, November 13, 2014 8:45 AM *To:* Bill Jay *Cc:* u...@spark.incubator.apache.org *Subject:* Re: Spark streaming cannot receive any message from Kafka Bill, However, when I am currently using Spark 1.1.0. the Spark streaming job cannot receive any messages from Kafka. I have not made any change to the code. Do you see any suspicious messages in the log output? Tobias
Re: Spark-Streaming: output to cassandra
You can just do You can just do something like this, the Spark Cassandra Connector handles the rest KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, Map(KafkaTopicRaw - 10), StorageLevel.DISK_ONLY_2) .map { case (_, line) = line.split(,)} .map(RawWeatherData(_)) .saveToCassandra(CassandraKeyspace, CassandraTableRaw) - Helena @helenaedelson On Dec 4, 2014, at 9:51 AM, m.sar...@accenture.com wrote: Hi, I have written the code below which is streaming data from kafka, and printing to the console. I want to extend this, and want my data to go into Cassandra table instead. JavaStreamingContext jssc = new JavaStreamingContext(local[4], SparkStream, new Duration(1000)); JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); System.out.println(Connection done!); JavaDStreamString data = messages.map(new FunctionTuple2String, String, String() { public String call(Tuple2String, String message) { return message._2(); } } ); //data.print(); -- output to console data.foreachRDD(saveToCassandra(mykeyspace,mytable)); jssc.start(); jssc.awaitTermination(); How should I implement the line: data.foreachRDD(saveToCassandra(mykeyspace,mytable)); so that data goes into Cassandra, in each batch. And how do I specify a batch, because if i do Ctrl+C on the console of streaming-job-jar, nothing will be entered into cassandra for sure since it is getting killed. Please help. Thanks and Regards, Md. Aiman Sarosh. Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841. This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy. __ www.accenture.com
Re: Spark-Streaming: output to cassandra
I think what you are looking for is something like: JavaRDDDouble pricesRDD = javaFunctions(sc).cassandraTable(ks, tab, mapColumnTo(Double.class)).select(price); JavaRDDPerson rdd = javaFunctions(sc).cassandraTable(ks, people, mapRowTo(Person.class)); noted here: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/7_java_api.md ? - Helena @helenaedelson On Dec 5, 2014, at 10:15 AM, m.sar...@accenture.com m.sar...@accenture.com wrote: Hi Akhil, Vyas, Helena, Thank you for your suggestions. As Akhil suggested earlier, i have implemented the batch Duration into JavaStreamingContext and waitForTermination(Duration). The approach Helena suggested is Scala oriented. But the issue now is that I want to set Cassandra as my output. I have created a table in cassandra test_table with columns key:text primary key and value:text I have mapped the data successfully into JavaDStreamTuple2String,String data : JavaSparkContext sc = new JavaSparkContext(local[4], SparkStream,conf); JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(3000)); JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); JavaDStreamTuple2String,String data = messages.map(new Function Tuple2String,String, Tuple2String,String () { public Tuple2String,String call(Tuple2String, String message) { return new Tuple2String,String( message._1(), message._2() ); } } ); Then I have created a List: ListTestTable list = new ArrayListTestTable(); where TestTable is my custom class having the same structure as my Cassandra table, with members key and value: class TestTable { String key; String val; public TestTable() {} public TestTable(String k, String v) { key=k; val=v; } public String getKey(){ return key; } public void setKey(String k){ key=k; } public String getVal(){ return val; } public void setVal(String v){ val=v; } public String toString(){ return Key:+key+,Val:+val; } } Please suggest a way how to I add the data from JavaDStreamTuple2String,String data into the ListTestTable list. I am doing this so that I can subsequently use JavaRDDTestTable rdd = sc.parallelize(list); javaFunctions(rdd, TestTable.class).saveToCassandra(testkeyspace, test_table); to save the RDD data into Cassandra. I had tried coding this way: messages.foreachRDD(new FunctionTuple2String,String, String() { public ListTestTable call(Tuple2String,String message) { String k = message._1(); String v = message._2(); TestTable tbl = new TestTable(k,v); list.put(tbl); } } ); but seems some type mis-match happenning. Please help. Thanks and Regards, Md. Aiman Sarosh. Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841. From: Helena Edelson helena.edel...@datastax.com Sent: Friday, December 5, 2014 6:26 PM To: Sarosh, M. Cc: user@spark.apache.org Subject: Re: Spark-Streaming: output to cassandra You can just do You can just do something like this, the Spark Cassandra Connector handles the rest KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, Map(KafkaTopicRaw - 10), StorageLevel.DISK_ONLY_2) .map { case (_, line) = line.split(,)} .map(RawWeatherData(_)) .saveToCassandra(CassandraKeyspace, CassandraTableRaw) - Helena @helenaedelson On Dec 4, 2014, at 9:51 AM, m.sar...@accenture.com wrote: Hi, I have written the code below which is streaming data from kafka, and printing to the console. I want to extend this, and want my data to go into Cassandra table instead. JavaStreamingContext jssc = new JavaStreamingContext(local[4], SparkStream, new Duration(1000)); JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); System.out.println(Connection done!); JavaDStreamString data = messages.map(new FunctionTuple2String, String, String() { public String call(Tuple2String, String message) { return message._2(); } } ); //data.print(); -- output to console data.foreachRDD(saveToCassandra(mykeyspace,mytable)); jssc.start(); jssc.awaitTermination(); How should I implement the line: data.foreachRDD(saveToCassandra(mykeyspace,mytable)); so that data goes into Cassandra, in each batch. And how do I specify a batch, because if i do Ctrl+C on the console of streaming-job-jar, nothing will be entered into cassandra for sure since it is getting killed. Please help. Thanks and Regards, Md. Aiman Sarosh. Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841. This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications
Re: Error: Spark-streaming to Cassandra
I am curious why you use the 1.0.4 java artifact with the latest 1.1.0? This might be your compilation problem - The older java version. dependency groupIdcom.datastax.spark/groupId artifactIdspark-cassandra-connector_2.10/artifactId version1.1.0/version /dependency dependency groupIdcom.datastax.spark/groupId artifactIdspark-cassandra-connector-java_2.10/artifactId version1.0.4/version /dependency See: - doc https://github.com/datastax/spark-cassandra-connector/blob/master/doc/7_java_api.md - mvn repo http://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector-java_2.10/1.1.0 - Helena @helenaedelson On Dec 8, 2014, at 12:47 PM, m.sar...@accenture.com wrote: Hi, I am intending to save the streaming data from kafka into Cassandra, using spark-streaming: But there seems to be problem with line javaFunctions(data).writerBuilder(testkeyspace, test_table, mapToRow(TestTable.class)).saveToCassandra(); I am getting 2 errors. the code, the error-log and POM.xml dependencies are listed below: Please help me find the reason as to why is this happening. public class SparkStream { static int key=0; public static void main(String args[]) throws Exception { if(args.length != 3) { System.out.println(SparkStream zookeeper_ip group_nm topic1,topic2,...); System.exit(1); } Logger.getLogger(org).setLevel(Level.OFF); Logger.getLogger(akka).setLevel(Level.OFF); MapString,Integer topicMap = new HashMapString,Integer(); String[] topic = args[2].split(,); for(String t: topic) { topicMap.put(t, new Integer(3)); } /* Connection to Spark */ SparkConf conf = new SparkConf(); JavaSparkContext sc = new JavaSparkContext(local[4], SparkStream,conf); JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(3000)); /* connection to cassandra */ /*conf.set(spark.cassandra.connection.host, 127.0.0.1:9042); CassandraConnector connector = CassandraConnector.apply(sc.getConf()); Session session = connector.openSession(); session.execute(CREATE TABLE IF NOT EXISTS testkeyspace.test_table (key INT PRIMARY KEY, value TEXT)); */ /* Receive Kafka streaming inputs */ JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); /* Create DStream */ JavaDStreamTestTable data = messages.map(new FunctionTuple2String,String, TestTable () { public TestTable call(Tuple2String, String message) { return new TestTable(new Integer(++key), message._2() ); } } ); /* Write to cassandra */ javaFunctions(data).writerBuilder(testkeyspace, test_table, mapToRow(TestTable.class)).saveToCassandra(); // data.print(); jssc.start(); jssc.awaitTermination(); } } class TestTable implements Serializable { Integer key; String value; public TestTable() {} public TestTable(Integer k, String v) { key=k; value=v; } public Integer getKey(){ return key; } public void setKey(Integer k){ key=k; } public String getValue(){ return value; } public void setValue(String v){ value=v; } public String toString(){ return MessageFormat.format(TestTable'{'key={0}, value={1}'}', key, value); } } The output log is: [INFO] Compiling 1 source file to /root/Documents/SparkStreamSample/target/classes [INFO] 2 errors [INFO] - [ERROR] COMPILATION ERROR : [INFO] - [ERROR] /root/Documents/SparkStreamSample/src/main/java/com/spark/SparkStream.java:[76,81] cannot find symbol symbol: method mapToRow(java.lang.Classcom.spark.TestTable) location: class com.spark.SparkStream [ERROR] /root/Documents/SparkStreamSample/src/main/java/com/spark/SparkStream.java:[76,17] no suitable method found for javaFunctions(org.apache.spark.streaming.api.java.JavaDStreamcom.spark.TestTable) method com.datastax.spark.connector.CassandraJavaUtil.TjavaFunctions(org.apache.spark.streaming.api.java.JavaDStreamT,java.lang.ClassT) is not applicable (cannot infer type-variable(s) T (actual and formal argument lists differ in length)) method com.datastax.spark.connector.CassandraJavaUtil.TjavaFunctions(org.apache.spark.streaming.dstream.DStreamT,java.lang.ClassT) is not applicable (cannot infer type-variable(s) T (actual and formal argument lists differ in length)) method com.datastax.spark.connector.CassandraJavaUtil.TjavaFunctions(org.apache.spark.api.java.JavaRDDT,java.lang.ClassT) is not applicable
Re: JSON Input files
One solution can be found here: https://spark.apache.org/docs/1.1.0/sql-programming-guide.html#json-datasets - Helena @helenaedelson On Dec 13, 2014, at 11:18 AM, Madabhattula Rajesh Kumar mrajaf...@gmail.com wrote: Hi Team, I have a large JSON file in Hadoop. Could you please let me know 1. How to read the JSON file 2. How to parse the JSON file Please share any example program based on Scala Regards, Rajesh
Re: Spark streaming alerting
Streaming _from_ cassandra, CassandraInputDStream, is coming BTW https://issues.apache.org/jira/browse/SPARK-6283 https://issues.apache.org/jira/browse/SPARK-6283 I am working on it now. Helena @helenaedelson On Mar 23, 2015, at 5:22 AM, Khanderao Kand Gmail khanderao.k...@gmail.com wrote: Akhil You are right in tour answer to what Mohit wrote. However what Mohit seems to be alluring but did not write properly might be different. Mohit You are wrong in saying generally streaming works in HDFS and cassandra . Streaming typically works with streaming or queing source like Kafka, kinesis, Twitter, flume, zeroMQ, etc (but can also from HDFS and S3 ) However , streaming context ( receiver wishing the streaming context ) gets events/messages/records and forms a time window based batch (RDD)- So there is a maximum gap of window time from alert message was available to spark and when the processing happens. I think you meant about this. As per spark programming model, RDD is the right way to deal with data. If you are fine with the minimum delay of say a sec (based on min time window that dstreaming can support) then what Rohit gave is a right model. Khanderao On Mar 22, 2015, at 11:39 PM, Akhil Das ak...@sigmoidanalytics.com mailto:ak...@sigmoidanalytics.com wrote: What do you mean you can't send it directly from spark workers? Here's a simple approach which you could do: val data = ssc.textFileStream(sigmoid/) val dist = data.filter(_.contains(ERROR)).foreachRDD(rdd = alert(Errors : + rdd.count())) And the alert() function could be anything triggering an email or sending an SMS alert. Thanks Best Regards On Sun, Mar 22, 2015 at 1:52 AM, Mohit Anchlia mohitanch...@gmail.com mailto:mohitanch...@gmail.com wrote: Is there a module in spark streaming that lets you listen to the alerts/conditions as they happen in the streaming module? Generally spark streaming components will execute on large set of clusters like hdfs or Cassandra, however when it comes to alerting you generally can't send it directly from the spark workers, which means you need a way to listen to the alerts.
Re: Spark streaming alerting
I created a jira ticket for my work in both the spark and spark-cassandra-connector JIRAs, I don’t know why you can not see them. Users can stream from any cassandra table, just as one can stream from a Kafka topic; same principle. Helena @helenaedelson On Mar 24, 2015, at 11:29 AM, Anwar Rizal anriza...@gmail.com wrote: Helena, The CassandraInputDStream sounds interesting. I dont find many things in the jira though. Do you have more details on what it tries to achieve ? Thanks, Anwar. On Tue, Mar 24, 2015 at 2:39 PM, Helena Edelson helena.edel...@datastax.com mailto:helena.edel...@datastax.com wrote: Streaming _from_ cassandra, CassandraInputDStream, is coming BTW https://issues.apache.org/jira/browse/SPARK-6283 https://issues.apache.org/jira/browse/SPARK-6283 I am working on it now. Helena @helenaedelson On Mar 23, 2015, at 5:22 AM, Khanderao Kand Gmail khanderao.k...@gmail.com mailto:khanderao.k...@gmail.com wrote: Akhil You are right in tour answer to what Mohit wrote. However what Mohit seems to be alluring but did not write properly might be different. Mohit You are wrong in saying generally streaming works in HDFS and cassandra . Streaming typically works with streaming or queing source like Kafka, kinesis, Twitter, flume, zeroMQ, etc (but can also from HDFS and S3 ) However , streaming context ( receiver wishing the streaming context ) gets events/messages/records and forms a time window based batch (RDD)- So there is a maximum gap of window time from alert message was available to spark and when the processing happens. I think you meant about this. As per spark programming model, RDD is the right way to deal with data. If you are fine with the minimum delay of say a sec (based on min time window that dstreaming can support) then what Rohit gave is a right model. Khanderao On Mar 22, 2015, at 11:39 PM, Akhil Das ak...@sigmoidanalytics.com mailto:ak...@sigmoidanalytics.com wrote: What do you mean you can't send it directly from spark workers? Here's a simple approach which you could do: val data = ssc.textFileStream(sigmoid/) val dist = data.filter(_.contains(ERROR)).foreachRDD(rdd = alert(Errors : + rdd.count())) And the alert() function could be anything triggering an email or sending an SMS alert. Thanks Best Regards On Sun, Mar 22, 2015 at 1:52 AM, Mohit Anchlia mohitanch...@gmail.com mailto:mohitanch...@gmail.com wrote: Is there a module in spark streaming that lets you listen to the alerts/conditions as they happen in the streaming module? Generally spark streaming components will execute on large set of clusters like hdfs or Cassandra, however when it comes to alerting you generally can't send it directly from the spark workers, which means you need a way to listen to the alerts.
Re: How to parse Json formatted Kafka message in spark streaming
Hi Cui, What version of Spark are you using? There was a bug ticket that may be related to this, fixed in core/src/main/scala/org/apache/spark/rdd/RDD.scala that is merged into versions 1.3.0 and 1.2.1 . If you are using 1.1.1 that may be the reason but it’s a stretch https://issues.apache.org/jira/browse/SPARK-4968 Did you verify that you have data streaming from Kafka? Helena https://twitter.com/helenaedelson On Mar 5, 2015, at 12:43 AM, Cui Lin cui@hds.com wrote: Friends, I'm trying to parse json formatted Kafka messages and then send back to cassandra.I have two problems: I got the exception below. How to check an empty RDD? Exception in thread main java.lang.UnsupportedOperationException: empty collection at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.reduce(RDD.scala:869) at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57) at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232) at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204) val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](…) messages.foreachRDD { rdd = val message:RDD[String] = rdd.map { y = y._2 } sqlContext.jsonRDD(message).registerTempTable(tempTable) sqlContext.sql(SELECT time,To FROM tempTable) .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns(key, msg)) } 2. how to get all column names from json messages? I have hundreds of columns in the json formatted message. Thanks for your help! Best regards, Cui Lin
Re: How to parse Json formatted Kafka message in spark streaming
Great point :) Cui, Here’s a cleaner way than I had before, w/out the use of spark sql for the mapping: KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafka.kafkaParams, Map(github - 5), StorageLevel.MEMORY_ONLY) .map{ case (k,v) = JsonParser.parse(v).extract[MonthlyCommits]} .saveToCassandra(githubstats,monthly_commits) HELENA EDELSON Senior Software Engineer, DSE Analytics On Mar 5, 2015, at 9:33 AM, Ted Yu yuzhih...@gmail.com wrote: Cui: You can check messages.partitions.size to determine whether messages is an empty RDD. Cheers On Thu, Mar 5, 2015 at 12:52 AM, Akhil Das ak...@sigmoidanalytics.com wrote: When you use KafkaUtils.createStream with StringDecoders, it will return String objects inside your messages stream. To access the elements from the json, you could do something like the following: val mapStream = messages.map(x= { val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) mapper.readValue[Map[String,Any]](x).get(time) }) Thanks Best Regards On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin cui@hds.com wrote: Friends, I'm trying to parse json formatted Kafka messages and then send back to cassandra.I have two problems: I got the exception below. How to check an empty RDD? Exception in thread main java.lang.UnsupportedOperationException: empty collection at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.reduce(RDD.scala:869) at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57) at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232) at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204) val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](…) messages.foreachRDD { rdd = val message:RDD[String] = rdd.map { y = y._2 } sqlContext.jsonRDD(message).registerTempTable(tempTable) sqlContext.sql(SELECT time,To FROM tempTable) .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns(key, msg)) } 2. how to get all column names from json messages? I have hundreds of columns in the json formatted message. Thanks for your help! Best regards, Cui Lin
Re: Grouping and storing unordered time series data stream to HDFS
Consider using cassandra with spark streaming and timeseries, cassandra has been doing time series for years. Here’s some snippets with kafka streaming and writing/reading the data back: https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L62-L64 https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L62-L64 or write in the stream, read back https://github.com/killrweather/killrweather/blob/master/killrweather-examples/src/main/scala/com/datastax/killrweather/KafkaStreamingJson2.scala#L53-L61 https://github.com/killrweather/killrweather/blob/master/killrweather-examples/src/main/scala/com/datastax/killrweather/KafkaStreamingJson2.scala#L53-L61 or more detailed reads back https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/TemperatureActor.scala#L65-L69 https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/TemperatureActor.scala#L65-L69 A CassandraInputDStream is coming, i’m working on it now. Helena @helenaedelson On May 15, 2015, at 9:59 AM, ayan guha guha.a...@gmail.com wrote: Hi Do you have a cut off time, like how late an event can be? Else, you may consider a different persistent storage like Cassandra/Hbase and delegate update: part to them. On Fri, May 15, 2015 at 8:10 PM, Nisrina Luthfiyati nisrina.luthfiy...@gmail.com mailto:nisrina.luthfiy...@gmail.com wrote: Hi all, I have a stream of data from Kafka that I want to process and store in hdfs using Spark Streaming. Each data has a date/time dimension and I want to write data within the same time dimension to the same hdfs directory. The data stream might be unordered (by time dimension). I'm wondering what are the best practices in grouping/storing time series data stream using Spark Streaming? I'm considering grouping each batch of data in Spark Streaming per time dimension and then saving each group to different hdfs directories. However since it is possible for data with the same time dimension to be in different batches, I would need to handle update in case the hdfs directory already exists. Is this a common approach? Are there any other approaches that I can try? Thank you! Nisrina. -- Best Regards, Ayan Guha
Re: Execption writing on two cassandra tables NoHostAvailableException: All host(s) tried for query failed (no host was tried)
Hi Antonio, First, what version of the Spark Cassandra Connector are you using? You are using Spark 1.3.1, which the Cassandra connector today supports in builds from the master branch only - the release with public artifacts supporting Spark 1.3.1 is coming soon ;) Please see https://github.com/datastax/spark-cassandra-connector#version-compatibility https://github.com/datastax/spark-cassandra-connector#version-compatibility Try the version change and LMK. What does your cassandra log say? Note that you can read from a Spark stream like Flume, for instance in your flumeStreamNavig.map(..) code (in scala at least, with a lot less code - I have not used java) (here it’s kafka) https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L39 https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L39 And write inline to Cassandra https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L45 https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L45 https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L64 https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L64 Helena tw: @helenaedelson On May 29, 2015, at 6:11 AM, Antonio Giambanco antogia...@gmail.com wrote: Hi all, I have in a single server installed spark 1.3.1 and cassandra 2.0.14 I'm coding a simple java class for Spark Streaming as follow: reading header events from flume sink based on header I write the event body on navigation or transaction table (cassandra) unfortunatly I get NoHostAvailableException, if I comment the code for saving one of the two tables everything works here the code public static void main(String[] args) { // Create a local StreamingContext with two working thread and batch interval of 1 second SparkConf conf = new SparkConf().setMaster(local[2]).setAppName(DWXNavigationApp); conf.set(spark.cassandra.connection.host, 127.0.0.1); conf.set(spark.cassandra.connection.native.port,9042); conf.set(spark.cassandra.output.batch.size.rows, 1); conf.set(spark.cassandra.output.concurrent.writes, 1); final JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1)); JavaReceiverInputDStreamSparkFlumeEvent flumeStreamNavig = FlumeUtils.createPollingStream(jssc, 127.0.0.1, ); JavaDStreamString logRowsNavig = flumeStreamNavig.map( new FunctionSparkFlumeEvent,String(){ @Override public String call(SparkFlumeEvent arg0) throws Exception { // TODO Auto-generated method stub0. MapCharSequence,CharSequence headers = arg0.event().getHeaders(); ByteBuffer bytePayload = arg0.event().getBody(); String s = headers.get(source_log).toString() + # + new String(bytePayload.array()); System.out.println(RIGA: + s); return s; } }); logRowsNavig.foreachRDD( new FunctionJavaRDDString,Void(){ @Override public Void call(JavaRDDString rows) throws Exception { if(!rows.isEmpty()){ //String header = getHeaderFronRow(rows.collect()); ListNavigation listNavigation = new ArrayListNavigation(); ListTransaction listTransaction = new ArrayListTransaction(); for(String row : rows.collect()){ String header = row.substring(0, row.indexOf(#)); if(header.contains(controller_log)){ listNavigation.add(createNavigation(row)); System.out.println(Added Element in Navigation List); }else if(header.contains(business_log)){
Re: Execption writing on two cassandra tables NoHostAvailableException: All host(s) tried for query failed (no host was tried)
) at com.datastax.driver.core.AbstractSession.prepareAsync(AbstractSession.java:103) at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:89) ... 24 more Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org http://org.apache.spark.scheduler.dagscheduler.org/$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 15/06/01 16:43:30 WARN TaskSetManager: Lost task 1.0 in stage 61.0 (TID 82, localhost): org.apache.spark.TaskKilledException at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194) A G 2015-06-01 13:26 GMT+02:00 Helena Edelson helena.edel...@datastax.com mailto:helena.edel...@datastax.com: Hi Antonio, First, what version of the Spark Cassandra Connector are you using? You are using Spark 1.3.1, which the Cassandra connector today supports in builds from the master branch only - the release with public artifacts supporting Spark 1.3.1 is coming soon ;) Please see https://github.com/datastax/spark-cassandra-connector#version-compatibility https://github.com/datastax/spark-cassandra-connector#version-compatibility Try the version change and LMK. What does your cassandra log say? Note that you can read from a Spark stream like Flume, for instance in your flumeStreamNavig.map(..) code (in scala at least, with a lot less code - I have not used java) (here it’s kafka) https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L39 https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L39 And write inline to Cassandra https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L45 https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L45 https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L64 https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L64 Helena tw: @helenaedelson On May 29, 2015, at 6:11 AM, Antonio Giambanco antogia...@gmail.com mailto:antogia...@gmail.com wrote: Hi all, I have in a single server installed spark 1.3.1 and cassandra 2.0.14 I'm coding a simple java class for Spark Streaming as follow: reading header events from flume sink based on header I write the event body on navigation or transaction table (cassandra) unfortunatly I get NoHostAvailableException, if I comment the code for saving one of the two tables everything works here the code public static void main(String[] args) { // Create a local StreamingContext with two working thread and batch interval of 1 second SparkConf conf = new SparkConf().setMaster(local[2]).setAppName(DWXNavigationApp); conf.set(spark.cassandra.connection.host, 127.0.0.1); conf.set(spark.cassandra.connection.native.port,9042); conf.set(spark.cassandra.output.batch.size.rows, 1); conf.set(spark.cassandra.output.concurrent.writes, 1); final JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1)); JavaReceiverInputDStreamSparkFlumeEvent flumeStreamNavig = FlumeUtils.createPollingStream(jssc, 127.0.0.1, ); JavaDStreamString logRowsNavig = flumeStreamNavig.map( new FunctionSparkFlumeEvent,String