Hi,

I've come across some strange behaviour with Spark 1.6.0.

In the code below, the filtering by "eventName" only seems to work if I
called .cache on the resulting DataFrame.

If I don't do this, the code crashes inside the UDF because it processes an
event that the filter should get rid off.

Any ideas why this might be the case?

The code is as follows:

>       val df = sqlContext.read.parquet(inputPath)
>       val filtered = df.filter(df("eventName").equalTo(Created))
>       val extracted = extractEmailReferences(sqlContext, filtered.cache)
> // Caching seems to be required for the filter to work
>       extracted.write.parquet(outputPath)


where extractEmailReferences does this:

>

def extractEmailReferences(sqlContext: SQLContext, df: DataFrame):
> DataFrame = {

    val extracted = df.select(df(EventFieldNames.ObjectId),

      extractReferencesUDF(df(EventFieldNames.EventJson),
> df(EventFieldNames.ObjectId), df(EventFieldNames.UserId)) as "references")


>     extracted.filter(extracted("references").notEqual("UNKNOWN"))

  }


and extractReferencesUDF:

> def extractReferencesUDF = udf(extractReferences(_: String, _: String, _:
> String))

def extractReferences(eventJson: String, objectId: String, userId: String):
> String = {
>     import org.json4s.jackson.Serialization
>     import org.json4s.NoTypeHints
>     implicit val formats = Serialization.formats(NoTypeHints)
>
>     val created = Serialization.read[GMailMessage.Created](eventJson) //
> This is where the code crashes if the .cache isn't called


 Regards,

James

Reply via email to