Re: DataFrame .filter only seems to work when .cache is called in local mode in 1.6.0
I am not Spark committer. So I cannot be the shepherd :-) > On Mar 9, 2016, at 2:27 AM, James Hammertonwrote: > > Hi Ted, > > Finally got round to creating this: > https://issues.apache.org/jira/browse/SPARK-13773 > > I hope you don't mind me selecting you as the shepherd for this ticket. > > Regards, > > James > > >> On 7 March 2016 at 17:50, James Hammerton wrote: >> Hi Ted, >> >> Thanks for getting back - I realised my mistake... I was clicking the little >> drop down menu on the right hand side of the Create button (it looks as if >> it's part of the button) - when I clicked directly on the word "Create" I >> got a form that made more sense and allowed me to choose the project. >> >> Regards, >> >> James >> >> >>> On 7 March 2016 at 13:09, Ted Yu wrote: >>> Have you tried clicking on Create button from an existing Spark JIRA ? >>> e.g. >>> https://issues.apache.org/jira/browse/SPARK-4352 >>> >>> Once you're logged in, you should be able to select Spark as the Project. >>> >>> Cheers >>> On Mon, Mar 7, 2016 at 2:54 AM, James Hammerton wrote: Hi, So I managed to isolate the bug and I'm ready to try raising a JIRA issue. I joined the Apache Jira project so I can create tickets. However when I click Create from the Spark project home page on JIRA, it asks me to click on one of the following service desks: Kylin, Atlas, Ranger, Apache Infrastructure. There doesn't seem to be an option for me to raise an issue for Spark?! Regards, James > On 4 March 2016 at 14:03, James Hammerton wrote: > Sure thing, I'll see if I can isolate this. > > Regards. > > James > >> On 4 March 2016 at 12:24, Ted Yu wrote: >> If you can reproduce the following with a unit test, I suggest you open >> a JIRA. >> >> Thanks >> >>> On Mar 4, 2016, at 4:01 AM, James Hammerton wrote: >>> >>> Hi, >>> >>> I've come across some strange behaviour with Spark 1.6.0. >>> >>> In the code below, the filtering by "eventName" only seems to work if I >>> called .cache on the resulting DataFrame. >>> >>> If I don't do this, the code crashes inside the UDF because it >>> processes an event that the filter should get rid off. >>> >>> Any ideas why this might be the case? >>> >>> The code is as follows: val df = sqlContext.read.parquet(inputPath) val filtered = df.filter(df("eventName").equalTo(Created)) val extracted = extractEmailReferences(sqlContext, filtered.cache) // Caching seems to be required for the filter to work extracted.write.parquet(outputPath) >>> >>> where extractEmailReferences does this: def extractEmailReferences(sqlContext: SQLContext, df: DataFrame): DataFrame = { val extracted = df.select(df(EventFieldNames.ObjectId), extractReferencesUDF(df(EventFieldNames.EventJson), df(EventFieldNames.ObjectId), df(EventFieldNames.UserId)) as "references") extracted.filter(extracted("references").notEqual("UNKNOWN")) } >>> >>> and extractReferencesUDF: def extractReferencesUDF = udf(extractReferences(_: String, _: String, _: String)) def extractReferences(eventJson: String, objectId: String, userId: String): String = { import org.json4s.jackson.Serialization import org.json4s.NoTypeHints implicit val formats = Serialization.formats(NoTypeHints) val created = Serialization.read[GMailMessage.Created](eventJson) // This is where the code crashes if the .cache isn't called >>> >>> Regards, >>> >>> James >
Re: DataFrame .filter only seems to work when .cache is called in local mode in 1.6.0
Hi Ted, Finally got round to creating this: https://issues.apache.org/jira/browse/SPARK-13773 I hope you don't mind me selecting you as the shepherd for this ticket. Regards, James On 7 March 2016 at 17:50, James Hammertonwrote: > Hi Ted, > > Thanks for getting back - I realised my mistake... I was clicking the > little drop down menu on the right hand side of the Create button (it looks > as if it's part of the button) - when I clicked directly on the word > "Create" I got a form that made more sense and allowed me to choose the > project. > > Regards, > > James > > > On 7 March 2016 at 13:09, Ted Yu wrote: > >> Have you tried clicking on Create button from an existing Spark JIRA ? >> e.g. >> https://issues.apache.org/jira/browse/SPARK-4352 >> >> Once you're logged in, you should be able to select Spark as the Project. >> >> Cheers >> >> On Mon, Mar 7, 2016 at 2:54 AM, James Hammerton wrote: >> >>> Hi, >>> >>> So I managed to isolate the bug and I'm ready to try raising a JIRA >>> issue. I joined the Apache Jira project so I can create tickets. >>> >>> However when I click Create from the Spark project home page on JIRA, it >>> asks me to click on one of the following service desks: Kylin, Atlas, >>> Ranger, Apache Infrastructure. There doesn't seem to be an option for me to >>> raise an issue for Spark?! >>> >>> Regards, >>> >>> James >>> >>> >>> On 4 March 2016 at 14:03, James Hammerton wrote: >>> Sure thing, I'll see if I can isolate this. Regards. James On 4 March 2016 at 12:24, Ted Yu wrote: > If you can reproduce the following with a unit test, I suggest you > open a JIRA. > > Thanks > > On Mar 4, 2016, at 4:01 AM, James Hammerton wrote: > > Hi, > > I've come across some strange behaviour with Spark 1.6.0. > > In the code below, the filtering by "eventName" only seems to work if > I called .cache on the resulting DataFrame. > > If I don't do this, the code crashes inside the UDF because it > processes an event that the filter should get rid off. > > Any ideas why this might be the case? > > The code is as follows: > >> val df = sqlContext.read.parquet(inputPath) >> val filtered = df.filter(df("eventName").equalTo(Created)) >> val extracted = extractEmailReferences(sqlContext, >> filtered.cache) // Caching seems to be required for the filter to work >> extracted.write.parquet(outputPath) > > > where extractEmailReferences does this: > >> > > def extractEmailReferences(sqlContext: SQLContext, df: DataFrame): >> DataFrame = { > > val extracted = df.select(df(EventFieldNames.ObjectId), > > extractReferencesUDF(df(EventFieldNames.EventJson), >> df(EventFieldNames.ObjectId), df(EventFieldNames.UserId)) as >> "references") > > >> extracted.filter(extracted("references").notEqual("UNKNOWN")) > > } > > > and extractReferencesUDF: > >> def extractReferencesUDF = udf(extractReferences(_: String, _: >> String, _: String)) > > def extractReferences(eventJson: String, objectId: String, userId: >> String): String = { >> import org.json4s.jackson.Serialization >> import org.json4s.NoTypeHints >> implicit val formats = Serialization.formats(NoTypeHints) >> >> val created = Serialization.read[GMailMessage.Created](eventJson) >> // This is where the code crashes if the .cache isn't called > > > Regards, > > James > > >>> >> >
Re: DataFrame .filter only seems to work when .cache is called in local mode in 1.6.0
Hi Ted, Thanks for getting back - I realised my mistake... I was clicking the little drop down menu on the right hand side of the Create button (it looks as if it's part of the button) - when I clicked directly on the word "Create" I got a form that made more sense and allowed me to choose the project. Regards, James On 7 March 2016 at 13:09, Ted Yuwrote: > Have you tried clicking on Create button from an existing Spark JIRA ? > e.g. > https://issues.apache.org/jira/browse/SPARK-4352 > > Once you're logged in, you should be able to select Spark as the Project. > > Cheers > > On Mon, Mar 7, 2016 at 2:54 AM, James Hammerton wrote: > >> Hi, >> >> So I managed to isolate the bug and I'm ready to try raising a JIRA >> issue. I joined the Apache Jira project so I can create tickets. >> >> However when I click Create from the Spark project home page on JIRA, it >> asks me to click on one of the following service desks: Kylin, Atlas, >> Ranger, Apache Infrastructure. There doesn't seem to be an option for me to >> raise an issue for Spark?! >> >> Regards, >> >> James >> >> >> On 4 March 2016 at 14:03, James Hammerton wrote: >> >>> Sure thing, I'll see if I can isolate this. >>> >>> Regards. >>> >>> James >>> >>> On 4 March 2016 at 12:24, Ted Yu wrote: >>> If you can reproduce the following with a unit test, I suggest you open a JIRA. Thanks On Mar 4, 2016, at 4:01 AM, James Hammerton wrote: Hi, I've come across some strange behaviour with Spark 1.6.0. In the code below, the filtering by "eventName" only seems to work if I called .cache on the resulting DataFrame. If I don't do this, the code crashes inside the UDF because it processes an event that the filter should get rid off. Any ideas why this might be the case? The code is as follows: > val df = sqlContext.read.parquet(inputPath) > val filtered = df.filter(df("eventName").equalTo(Created)) > val extracted = extractEmailReferences(sqlContext, > filtered.cache) // Caching seems to be required for the filter to work > extracted.write.parquet(outputPath) where extractEmailReferences does this: > def extractEmailReferences(sqlContext: SQLContext, df: DataFrame): > DataFrame = { val extracted = df.select(df(EventFieldNames.ObjectId), extractReferencesUDF(df(EventFieldNames.EventJson), > df(EventFieldNames.ObjectId), df(EventFieldNames.UserId)) as "references") > extracted.filter(extracted("references").notEqual("UNKNOWN")) } and extractReferencesUDF: > def extractReferencesUDF = udf(extractReferences(_: String, _: String, > _: String)) def extractReferences(eventJson: String, objectId: String, userId: > String): String = { > import org.json4s.jackson.Serialization > import org.json4s.NoTypeHints > implicit val formats = Serialization.formats(NoTypeHints) > > val created = Serialization.read[GMailMessage.Created](eventJson) > // This is where the code crashes if the .cache isn't called Regards, James >>> >> >
Re: DataFrame .filter only seems to work when .cache is called in local mode in 1.6.0
Have you tried clicking on Create button from an existing Spark JIRA ? e.g. https://issues.apache.org/jira/browse/SPARK-4352 Once you're logged in, you should be able to select Spark as the Project. Cheers On Mon, Mar 7, 2016 at 2:54 AM, James Hammertonwrote: > Hi, > > So I managed to isolate the bug and I'm ready to try raising a JIRA issue. > I joined the Apache Jira project so I can create tickets. > > However when I click Create from the Spark project home page on JIRA, it > asks me to click on one of the following service desks: Kylin, Atlas, > Ranger, Apache Infrastructure. There doesn't seem to be an option for me to > raise an issue for Spark?! > > Regards, > > James > > > On 4 March 2016 at 14:03, James Hammerton wrote: > >> Sure thing, I'll see if I can isolate this. >> >> Regards. >> >> James >> >> On 4 March 2016 at 12:24, Ted Yu wrote: >> >>> If you can reproduce the following with a unit test, I suggest you open >>> a JIRA. >>> >>> Thanks >>> >>> On Mar 4, 2016, at 4:01 AM, James Hammerton wrote: >>> >>> Hi, >>> >>> I've come across some strange behaviour with Spark 1.6.0. >>> >>> In the code below, the filtering by "eventName" only seems to work if I >>> called .cache on the resulting DataFrame. >>> >>> If I don't do this, the code crashes inside the UDF because it processes >>> an event that the filter should get rid off. >>> >>> Any ideas why this might be the case? >>> >>> The code is as follows: >>> val df = sqlContext.read.parquet(inputPath) val filtered = df.filter(df("eventName").equalTo(Created)) val extracted = extractEmailReferences(sqlContext, filtered.cache) // Caching seems to be required for the filter to work extracted.write.parquet(outputPath) >>> >>> >>> where extractEmailReferences does this: >>> >>> >>> def extractEmailReferences(sqlContext: SQLContext, df: DataFrame): DataFrame = { >>> >>> val extracted = df.select(df(EventFieldNames.ObjectId), >>> >>> extractReferencesUDF(df(EventFieldNames.EventJson), df(EventFieldNames.ObjectId), df(EventFieldNames.UserId)) as "references") >>> >>> extracted.filter(extracted("references").notEqual("UNKNOWN")) >>> >>> } >>> >>> >>> and extractReferencesUDF: >>> def extractReferencesUDF = udf(extractReferences(_: String, _: String, _: String)) >>> >>> def extractReferences(eventJson: String, objectId: String, userId: String): String = { import org.json4s.jackson.Serialization import org.json4s.NoTypeHints implicit val formats = Serialization.formats(NoTypeHints) val created = Serialization.read[GMailMessage.Created](eventJson) // This is where the code crashes if the .cache isn't called >>> >>> >>> Regards, >>> >>> James >>> >>> >> >
Re: DataFrame .filter only seems to work when .cache is called in local mode in 1.6.0
Hi, So I managed to isolate the bug and I'm ready to try raising a JIRA issue. I joined the Apache Jira project so I can create tickets. However when I click Create from the Spark project home page on JIRA, it asks me to click on one of the following service desks: Kylin, Atlas, Ranger, Apache Infrastructure. There doesn't seem to be an option for me to raise an issue for Spark?! Regards, James On 4 March 2016 at 14:03, James Hammertonwrote: > Sure thing, I'll see if I can isolate this. > > Regards. > > James > > On 4 March 2016 at 12:24, Ted Yu wrote: > >> If you can reproduce the following with a unit test, I suggest you open a >> JIRA. >> >> Thanks >> >> On Mar 4, 2016, at 4:01 AM, James Hammerton wrote: >> >> Hi, >> >> I've come across some strange behaviour with Spark 1.6.0. >> >> In the code below, the filtering by "eventName" only seems to work if I >> called .cache on the resulting DataFrame. >> >> If I don't do this, the code crashes inside the UDF because it processes >> an event that the filter should get rid off. >> >> Any ideas why this might be the case? >> >> The code is as follows: >> >>> val df = sqlContext.read.parquet(inputPath) >>> val filtered = df.filter(df("eventName").equalTo(Created)) >>> val extracted = extractEmailReferences(sqlContext, filtered.cache) >>> // Caching seems to be required for the filter to work >>> extracted.write.parquet(outputPath) >> >> >> where extractEmailReferences does this: >> >>> >> >> def extractEmailReferences(sqlContext: SQLContext, df: DataFrame): >>> DataFrame = { >> >> val extracted = df.select(df(EventFieldNames.ObjectId), >> >> extractReferencesUDF(df(EventFieldNames.EventJson), >>> df(EventFieldNames.ObjectId), df(EventFieldNames.UserId)) as "references") >> >> >>> extracted.filter(extracted("references").notEqual("UNKNOWN")) >> >> } >> >> >> and extractReferencesUDF: >> >>> def extractReferencesUDF = udf(extractReferences(_: String, _: String, >>> _: String)) >> >> def extractReferences(eventJson: String, objectId: String, userId: >>> String): String = { >>> import org.json4s.jackson.Serialization >>> import org.json4s.NoTypeHints >>> implicit val formats = Serialization.formats(NoTypeHints) >>> >>> val created = Serialization.read[GMailMessage.Created](eventJson) // >>> This is where the code crashes if the .cache isn't called >> >> >> Regards, >> >> James >> >> >
Re: DataFrame .filter only seems to work when .cache is called in local mode in 1.6.0
Sure thing, I'll see if I can isolate this. Regards. James On 4 March 2016 at 12:24, Ted Yuwrote: > If you can reproduce the following with a unit test, I suggest you open a > JIRA. > > Thanks > > On Mar 4, 2016, at 4:01 AM, James Hammerton wrote: > > Hi, > > I've come across some strange behaviour with Spark 1.6.0. > > In the code below, the filtering by "eventName" only seems to work if I > called .cache on the resulting DataFrame. > > If I don't do this, the code crashes inside the UDF because it processes > an event that the filter should get rid off. > > Any ideas why this might be the case? > > The code is as follows: > >> val df = sqlContext.read.parquet(inputPath) >> val filtered = df.filter(df("eventName").equalTo(Created)) >> val extracted = extractEmailReferences(sqlContext, filtered.cache) >> // Caching seems to be required for the filter to work >> extracted.write.parquet(outputPath) > > > where extractEmailReferences does this: > >> > > def extractEmailReferences(sqlContext: SQLContext, df: DataFrame): >> DataFrame = { > > val extracted = df.select(df(EventFieldNames.ObjectId), > > extractReferencesUDF(df(EventFieldNames.EventJson), >> df(EventFieldNames.ObjectId), df(EventFieldNames.UserId)) as "references") > > >> extracted.filter(extracted("references").notEqual("UNKNOWN")) > > } > > > and extractReferencesUDF: > >> def extractReferencesUDF = udf(extractReferences(_: String, _: String, _: >> String)) > > def extractReferences(eventJson: String, objectId: String, userId: >> String): String = { >> import org.json4s.jackson.Serialization >> import org.json4s.NoTypeHints >> implicit val formats = Serialization.formats(NoTypeHints) >> >> val created = Serialization.read[GMailMessage.Created](eventJson) // >> This is where the code crashes if the .cache isn't called > > > Regards, > > James > >