Re: How to parse Json formatted Kafka message in spark streaming
Hi, Ted, Thanks for your reply. I noticed from the below link partitions.size will not work for checking empty RDD in streams. It seems that the problem can be solved in spark 1.3 which is no way to download at this time? https://issues.apache.org/jira/browse/SPARK-5270 Best regards, Cui Lin From: Ted Yu yuzhih...@gmail.commailto:yuzhih...@gmail.com Date: Thursday, March 5, 2015 at 6:33 AM To: Akhil Das ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com Cc: Cui Lin cui@hds.commailto:cui@hds.com, user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: How to parse Json formatted Kafka message in spark streaming Cui: You can check messages.partitions.size to determine whether messages is an empty RDD. Cheers On Thu, Mar 5, 2015 at 12:52 AM, Akhil Das ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com wrote: When you use KafkaUtils.createStream with StringDecoders, it will return String objects inside your messages stream. To access the elements from the json, you could do something like the following: val mapStream = messages.map(x= { val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) mapper.readValue[Map[String,Any]](x).get(time) }) Thanks Best Regards On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin cui@hds.commailto:cui@hds.com wrote: Friends, I'm trying to parse json formatted Kafka messages and then send back to cassandra.I have two problems: 1. I got the exception below. How to check an empty RDD? Exception in thread main java.lang.UnsupportedOperationException: empty collection at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.reduce(RDD.scala:869) at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57) at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232) at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204) val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](…) messages.foreachRDD { rdd = val message:RDD[String] = rdd.map { y = y._2 } sqlContext.jsonRDD(message).registerTempTable(tempTable) sqlContext.sql(SELECT time,To FROM tempTable) .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns(key, msg)) } 2. how to get all column names from json messages? I have hundreds of columns in the json formatted message. Thanks for your help! Best regards, Cui Lin
Re: How to parse Json formatted Kafka message in spark streaming
See following thread for 1.3.0 release: http://search-hadoop.com/m/JW1q5hV8c4 Looks like the release is around the corner. On Thu, Mar 5, 2015 at 3:26 PM, Cui Lin cui@hds.com wrote: Hi, Ted, Thanks for your reply. I noticed from the below link partitions.size will not work for checking empty RDD in streams. It seems that the problem can be solved in spark 1.3 which is no way to download at this time? https://issues.apache.org/jira/browse/SPARK-5270 Best regards, Cui Lin From: Ted Yu yuzhih...@gmail.com Date: Thursday, March 5, 2015 at 6:33 AM To: Akhil Das ak...@sigmoidanalytics.com Cc: Cui Lin cui@hds.com, user@spark.apache.org user@spark.apache.org Subject: Re: How to parse Json formatted Kafka message in spark streaming Cui: You can check messages.partitions.size to determine whether messages is an empty RDD. Cheers On Thu, Mar 5, 2015 at 12:52 AM, Akhil Das ak...@sigmoidanalytics.com wrote: When you use KafkaUtils.createStream with StringDecoders, it will return String objects inside your messages stream. To access the elements from the json, you could do something like the following: val mapStream = messages.map(x= { val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) mapper.readValue[Map[String,Any]](x)*.get(time)* }) Thanks Best Regards On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin cui@hds.com wrote: Friends, I'm trying to parse json formatted Kafka messages and then send back to cassandra.I have two problems: 1. I got the exception below. How to check an empty RDD? Exception in thread main java.lang.UnsupportedOperationException: empty collection at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.reduce(RDD.scala:869) at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57) at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232) at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204) val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](…) messages.foreachRDD { rdd = val message:RDD[String] = rdd.map { y = y._2 } sqlContext.jsonRDD(message).registerTempTable(tempTable) sqlContext.sql(SELECT time,To FROM tempTable) .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns(key, msg)) } 2. how to get all column names from json messages? I have hundreds of columns in the json formatted message. Thanks for your help! Best regards, Cui Lin
Re: How to parse Json formatted Kafka message in spark streaming
When you use KafkaUtils.createStream with StringDecoders, it will return String objects inside your messages stream. To access the elements from the json, you could do something like the following: val mapStream = messages.map(x= { val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) mapper.readValue[Map[String,Any]](x)*.get(time)* }) Thanks Best Regards On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin cui@hds.com wrote: Friends, I'm trying to parse json formatted Kafka messages and then send back to cassandra.I have two problems: 1. I got the exception below. How to check an empty RDD? Exception in thread main java.lang.UnsupportedOperationException: empty collection at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.reduce(RDD.scala:869) at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57) at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232) at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204) val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](…) messages.foreachRDD { rdd = val message:RDD[String] = rdd.map { y = y._2 } sqlContext.jsonRDD(message).registerTempTable(tempTable) sqlContext.sql(SELECT time,To FROM tempTable) .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns(key, msg)) } 2. how to get all column names from json messages? I have hundreds of columns in the json formatted message. Thanks for your help! Best regards, Cui Lin
Re: How to parse Json formatted Kafka message in spark streaming
Hi Cui, What version of Spark are you using? There was a bug ticket that may be related to this, fixed in core/src/main/scala/org/apache/spark/rdd/RDD.scala that is merged into versions 1.3.0 and 1.2.1 . If you are using 1.1.1 that may be the reason but it’s a stretch https://issues.apache.org/jira/browse/SPARK-4968 Did you verify that you have data streaming from Kafka? Helena https://twitter.com/helenaedelson On Mar 5, 2015, at 12:43 AM, Cui Lin cui@hds.com wrote: Friends, I'm trying to parse json formatted Kafka messages and then send back to cassandra.I have two problems: I got the exception below. How to check an empty RDD? Exception in thread main java.lang.UnsupportedOperationException: empty collection at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.reduce(RDD.scala:869) at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57) at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232) at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204) val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](…) messages.foreachRDD { rdd = val message:RDD[String] = rdd.map { y = y._2 } sqlContext.jsonRDD(message).registerTempTable(tempTable) sqlContext.sql(SELECT time,To FROM tempTable) .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns(key, msg)) } 2. how to get all column names from json messages? I have hundreds of columns in the json formatted message. Thanks for your help! Best regards, Cui Lin
Re: How to parse Json formatted Kafka message in spark streaming
Cui: You can check messages.partitions.size to determine whether messages is an empty RDD. Cheers On Thu, Mar 5, 2015 at 12:52 AM, Akhil Das ak...@sigmoidanalytics.com wrote: When you use KafkaUtils.createStream with StringDecoders, it will return String objects inside your messages stream. To access the elements from the json, you could do something like the following: val mapStream = messages.map(x= { val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) mapper.readValue[Map[String,Any]](x)*.get(time)* }) Thanks Best Regards On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin cui@hds.com wrote: Friends, I'm trying to parse json formatted Kafka messages and then send back to cassandra.I have two problems: 1. I got the exception below. How to check an empty RDD? Exception in thread main java.lang.UnsupportedOperationException: empty collection at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.reduce(RDD.scala:869) at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57) at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232) at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204) val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](…) messages.foreachRDD { rdd = val message:RDD[String] = rdd.map { y = y._2 } sqlContext.jsonRDD(message).registerTempTable(tempTable) sqlContext.sql(SELECT time,To FROM tempTable) .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns(key, msg)) } 2. how to get all column names from json messages? I have hundreds of columns in the json formatted message. Thanks for your help! Best regards, Cui Lin
Re: How to parse Json formatted Kafka message in spark streaming
Great point :) Cui, Here’s a cleaner way than I had before, w/out the use of spark sql for the mapping: KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafka.kafkaParams, Map(github - 5), StorageLevel.MEMORY_ONLY) .map{ case (k,v) = JsonParser.parse(v).extract[MonthlyCommits]} .saveToCassandra(githubstats,monthly_commits) HELENA EDELSON Senior Software Engineer, DSE Analytics On Mar 5, 2015, at 9:33 AM, Ted Yu yuzhih...@gmail.com wrote: Cui: You can check messages.partitions.size to determine whether messages is an empty RDD. Cheers On Thu, Mar 5, 2015 at 12:52 AM, Akhil Das ak...@sigmoidanalytics.com wrote: When you use KafkaUtils.createStream with StringDecoders, it will return String objects inside your messages stream. To access the elements from the json, you could do something like the following: val mapStream = messages.map(x= { val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) mapper.readValue[Map[String,Any]](x).get(time) }) Thanks Best Regards On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin cui@hds.com wrote: Friends, I'm trying to parse json formatted Kafka messages and then send back to cassandra.I have two problems: I got the exception below. How to check an empty RDD? Exception in thread main java.lang.UnsupportedOperationException: empty collection at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.reduce(RDD.scala:869) at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57) at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232) at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204) val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](…) messages.foreachRDD { rdd = val message:RDD[String] = rdd.map { y = y._2 } sqlContext.jsonRDD(message).registerTempTable(tempTable) sqlContext.sql(SELECT time,To FROM tempTable) .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns(key, msg)) } 2. how to get all column names from json messages? I have hundreds of columns in the json formatted message. Thanks for your help! Best regards, Cui Lin
How to parse Json formatted Kafka message in spark streaming
Friends, I'm trying to parse json formatted Kafka messages and then send back to cassandra.I have two problems: 1. I got the exception below. How to check an empty RDD? Exception in thread main java.lang.UnsupportedOperationException: empty collection at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.reduce(RDD.scala:869) at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57) at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232) at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204) val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](…) messages.foreachRDD { rdd = val message:RDD[String] = rdd.map { y = y._2 } sqlContext.jsonRDD(message).registerTempTable(tempTable) sqlContext.sql(SELECT time,To FROM tempTable) .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns(key, msg)) } 2. how to get all column names from json messages? I have hundreds of columns in the json formatted message. Thanks for your help! Best regards, Cui Lin