Starting in Spark 1.4 there is also an explode that you can use directly from the select clause (much like in HiveQL):
import org.apache.spark.sql.functions._ df.select(explode($"entities.user_mentions").as("mention")) Unlike standard HiveQL, you can also include other attributes in the select or even $"*". On Wed, Jun 24, 2015 at 8:34 AM, Yin Huai <yh...@databricks.com> wrote: > The function accepted by explode is f: Row => TraversableOnce[A]. Seems > user_mentions is an array of structs. So, can you change your > pattern matching to the following? > > case Row(rows: Seq[_]) => rows.asInstanceOf[Seq[Row]].map(elem => ...) > > On Wed, Jun 24, 2015 at 5:27 AM, Gustavo Arjones < > garjo...@socialmetrix.com> wrote: > >> Hi All, >> >> I am using the new *Apache Spark version 1.4.0 Data-frames API* to >> extract information from Twitter's Status JSON, mostly focused on the >> Entities >> Object <https://dev.twitter.com/overview/api/entities> - the relevant >> part to this question is showed below: >> >> { >> ... >> ... >> "entities": { >> "hashtags": [], >> "trends": [], >> "urls": [], >> "user_mentions": [ >> { >> "screen_name": "linobocchini", >> "name": "Lino Bocchini", >> "id": 187356243, >> "id_str": "187356243", >> "indices": [ 3, 16 ] >> }, >> { >> "screen_name": "jeanwyllys_real", >> "name": "Jean Wyllys", >> "id": 111123176, >> "id_str": "111123176", >> "indices": [ 79, 95 ] >> } >> ], >> "symbols": [] >> }, >> ... >> ... >> } >> >> There are several examples on how extract information from primitives >> types as string, integer, etc - but I couldn't find anything on how to >> process those kind of *complex* structures. >> >> I tried the code below but it is still doesn't work, it throws an >> Exception >> >> val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) >> >> val tweets = sqlContext.read.json("tweets.json") >> >> // this function is just to filter empty entities.user_mentions[] nodes >> // some tweets doesn't contains any mentions >> import org.apache.spark.sql.functions.udf >> val isEmpty = udf((value: List[Any]) => value.isEmpty) >> >> import org.apache.spark.sql._ >> import sqlContext.implicits._ >> case class UserMention(id: Long, idStr: String, indices: Array[Long], name: >> String, screenName: String) >> >> val mentions = tweets.select("entities.user_mentions"). >> filter(!isEmpty($"user_mentions")). >> explode($"user_mentions") { >> case Row(arr: Array[Row]) => arr.map { elem => >> UserMention( >> elem.getAs[Long]("id"), >> elem.getAs[String]("is_str"), >> elem.getAs[Array[Long]]("indices"), >> elem.getAs[String]("name"), >> elem.getAs[String]("screen_name")) >> } >> } >> >> mentions.first >> >> Exception when I try to call mentions.first: >> >> scala> mentions.first >> 15/06/23 22:15:06 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 8) >> scala.MatchError: [List([187356243,187356243,List(3, 16),Lino >> Bocchini,linobocchini], [111123176,111123176,List(79, 95),Jean >> Wyllys,jeanwyllys_real])] (of class >> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema) >> at >> $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34) >> at >> $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34) >> at scala.Function1$$anonfun$andThen$1.apply(Function1.scala:55) >> at >> org.apache.spark.sql.catalyst.expressions.UserDefinedGenerator.eval(generators.scala:81) >> >> What is wrong here? I understand it is related to the types but I >> couldn't figure out it yet. >> >> As additional context, the structure mapped automatically is: >> >> scala> mentions.printSchema >> root >> |-- user_mentions: array (nullable = true) >> | |-- element: struct (containsNull = true) >> | | |-- id: long (nullable = true) >> | | |-- id_str: string (nullable = true) >> | | |-- indices: array (nullable = true) >> | | | |-- element: long (containsNull = true) >> | | |-- name: string (nullable = true) >> | | |-- screen_name: string (nullable = true) >> >> *NOTE 1:* I know it is possible to solve this using HiveQL but I would >> like to use Data-frames once there is so much momentum around it. >> >> SELECT explode(entities.user_mentions) as mentions >> FROM tweets >> >> *NOTE 2:* the *UDF* val isEmpty = udf((value: List[Any]) => >> value.isEmpty) is a ugly hack and I'm missing something here, but was >> the only way I came up to avoid a NPE >> >> I’ve posted the same question on SO: >> http://stackoverflow.com/questions/31016156/how-to-extract-complex-json-structures-using-apache-spark-1-4-0-data-frames >> >> Thanks all! >> - gustavo >> >> >