Starting in Spark 1.4 there is also an explode that you can use directly
from the select clause (much like in HiveQL):

import org.apache.spark.sql.functions._
df.select(explode($"entities.user_mentions").as("mention"))

Unlike standard HiveQL, you can also include other attributes in the select
or even $"*".


On Wed, Jun 24, 2015 at 8:34 AM, Yin Huai <yh...@databricks.com> wrote:

> The function accepted by explode is f: Row => TraversableOnce[A]. Seems
> user_mentions is an array of structs. So, can you change your
> pattern matching to the following?
>
> case Row(rows: Seq[_]) => rows.asInstanceOf[Seq[Row]].map(elem => ...)
>
> On Wed, Jun 24, 2015 at 5:27 AM, Gustavo Arjones <
> garjo...@socialmetrix.com> wrote:
>
>> Hi All,
>>
>> I am using the new *Apache Spark version 1.4.0 Data-frames API* to
>> extract information from Twitter's Status JSON, mostly focused on the 
>> Entities
>> Object <https://dev.twitter.com/overview/api/entities> - the relevant
>> part to this question is showed below:
>>
>> {
>>   ...
>>   ...
>>   "entities": {
>>     "hashtags": [],
>>     "trends": [],
>>     "urls": [],
>>     "user_mentions": [
>>       {
>>         "screen_name": "linobocchini",
>>         "name": "Lino Bocchini",
>>         "id": 187356243,
>>         "id_str": "187356243",
>>         "indices": [ 3, 16 ]
>>       },
>>       {
>>         "screen_name": "jeanwyllys_real",
>>         "name": "Jean Wyllys",
>>         "id": 111123176,
>>         "id_str": "111123176",
>>         "indices": [ 79, 95 ]
>>       }
>>     ],
>>     "symbols": []
>>   },
>>   ...
>>   ...
>> }
>>
>> There are several examples on how extract information from primitives
>> types as string, integer, etc - but I couldn't find anything on how to
>> process those kind of *complex* structures.
>>
>> I tried the code below but it is still doesn't work, it throws an
>> Exception
>>
>> val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>
>> val tweets = sqlContext.read.json("tweets.json")
>>
>> // this function is just to filter empty entities.user_mentions[] nodes
>> // some tweets doesn't contains any mentions
>> import org.apache.spark.sql.functions.udf
>> val isEmpty = udf((value: List[Any]) => value.isEmpty)
>>
>> import org.apache.spark.sql._
>> import sqlContext.implicits._
>> case class UserMention(id: Long, idStr: String, indices: Array[Long], name: 
>> String, screenName: String)
>>
>> val mentions = tweets.select("entities.user_mentions").
>>   filter(!isEmpty($"user_mentions")).
>>   explode($"user_mentions") {
>>   case Row(arr: Array[Row]) => arr.map { elem =>
>>     UserMention(
>>       elem.getAs[Long]("id"),
>>       elem.getAs[String]("is_str"),
>>       elem.getAs[Array[Long]]("indices"),
>>       elem.getAs[String]("name"),
>>       elem.getAs[String]("screen_name"))
>>   }
>> }
>>
>> mentions.first
>>
>> Exception when I try to call mentions.first:
>>
>> scala>     mentions.first
>> 15/06/23 22:15:06 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 8)
>> scala.MatchError: [List([187356243,187356243,List(3, 16),Lino 
>> Bocchini,linobocchini], [111123176,111123176,List(79, 95),Jean 
>> Wyllys,jeanwyllys_real])] (of class 
>> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
>>     at 
>> $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34)
>>     at 
>> $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34)
>>     at scala.Function1$$anonfun$andThen$1.apply(Function1.scala:55)
>>     at 
>> org.apache.spark.sql.catalyst.expressions.UserDefinedGenerator.eval(generators.scala:81)
>>
>> What is wrong here? I understand it is related to the types but I
>> couldn't figure out it yet.
>>
>> As additional context, the structure mapped automatically is:
>>
>> scala> mentions.printSchema
>> root
>>  |-- user_mentions: array (nullable = true)
>>  |    |-- element: struct (containsNull = true)
>>  |    |    |-- id: long (nullable = true)
>>  |    |    |-- id_str: string (nullable = true)
>>  |    |    |-- indices: array (nullable = true)
>>  |    |    |    |-- element: long (containsNull = true)
>>  |    |    |-- name: string (nullable = true)
>>  |    |    |-- screen_name: string (nullable = true)
>>
>> *NOTE 1:* I know it is possible to solve this using HiveQL but I would
>> like to use Data-frames once there is so much momentum around it.
>>
>> SELECT explode(entities.user_mentions) as mentions
>> FROM tweets
>>
>> *NOTE 2:* the *UDF* val isEmpty = udf((value: List[Any]) =>
>> value.isEmpty) is a ugly hack and I'm missing something here, but was
>> the only way I came up to avoid a NPE
>>
>> I’ve posted the same question on SO:
>> http://stackoverflow.com/questions/31016156/how-to-extract-complex-json-structures-using-apache-spark-1-4-0-data-frames
>>
>> Thanks all!
>> - gustavo
>>
>>
>

Reply via email to