I would be great if you could try sql("SET spark.sql.inMemoryColumnarStorage.partitionPruning=false") also, try Spark 1.5.2-RC2 <http://people.apache.org/~pwendell/spark-releases/spark-1.5.2-rc2-bin/>
On Fri, Nov 6, 2015 at 4:49 AM, Seongduk Cheon <s.c...@opt.ne.jp> wrote: > Hi Yanal! > > Yes, exactly. I read from csv file and convert to DF with column names. > > simply look like this. > > val eventDF = sc.textFile(eventFile).map(_.split(",")).filter(_.size >= 6) > .map { e => .... // To do sometings > }.toDF(eventTableColumns:_*).cache() > > > The result of <=> function is > > scala> eventDF.filter($"entityType" <=> > "user").select("entityId").distinct.count > res25: Long = 2091 > > As you mentioned, It seems related to nullable column. > Using case class works as expected. It is one of the best workaround so > far. > > > > 2015-11-06 19:01 GMT+09:00 Yanal Wazaefi <yanal.waza...@kelkoo.com>: > >> Hi Sondoku, >> >> Are you converting an event RDD using toDF("id", "event", "entityType", >> "entityId", >> "targetEntityType", "targetEntityId", "properties") function to get your >> eventDF ? >> >> Does <=> give you the correct result too (2091) ? >> eventDF.filter($"entityType" <=> >> "user").select("entityId").distinct.count >> >> I had the same problem with the DataFrame equality, using toDF("col1", >> "col2", ...) function. >> >> To resolve this problem (bug?), I used a *case class* and then I applied >> toDF() function. >> Something like that in your case: >> >> case class Event(id: String, event: String, entityType: String, entityId: >> String, targetEntityType: String, targetEntityId: String, properties: >> String) >> eventRDD.map{case (id, event, entityType, entityId, targetEntityType, >> targetEntityId, >> properties) => >> Event(id, event, entityType, entityId, targetEntityType, targetEntityId, >> properties) }.toDF() >> >> The comparison === should work in this case. >> >> The problem (I think) comes from some null values in the columns that are >> before the column user (e.g. id or event). >> >> Yanal >> >> Subject: Re: DataFrame equality does not working in 1.5.1 Date: Fri, 6 >> Nov 2015 02:14:18 +0100 From: Seongduk Cheon <s.c...@opt.ne.jp> >> <s.c...@opt.ne.jp> To: Yin Huai <yh...@databricks.com> >> <yh...@databricks.com> CC: user <user@spark.apache.org> >> <user@spark.apache.org> >> >> >> Hi, Yin >> >> Thanks for your time. This is the result. >> >> ------------------ >> scala> eventDF.filter($"entityType" === >> "user").select("entityId").distinct.explain(true) >> == Parsed Logical Plan == >> Aggregate [entityId#16], [entityId#16] >> Project [entityId#16] >> Filter (entityType#15 = user) >> Project [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS >> entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS >> properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS >> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS >> creationTimeZone#25] >> LogicalRDD >> [_1#0,_2#1,_3#2,_4#3,_5#4,_6#5,_7#6,_8#7,_9#8,_10#9,_11#10,_12#11,_13#12], >> MapPartitionsRDD[6] at rddToDataFrameHolder at <console>:61 >> >> == Analyzed Logical Plan == >> entityId: string >> Aggregate [entityId#16], [entityId#16] >> Project [entityId#16] >> Filter (entityType#15 = user) >> Project [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS >> entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS >> properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS >> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS >> creationTimeZone#25] >> LogicalRDD >> [_1#0,_2#1,_3#2,_4#3,_5#4,_6#5,_7#6,_8#7,_9#8,_10#9,_11#10,_12#11,_13#12], >> MapPartitionsRDD[6] at rddToDataFrameHolder at <console>:61 >> >> == Optimized Logical Plan == >> Aggregate [entityId#16], [entityId#16] >> Project [entityId#16] >> Filter (entityType#15 = user) >> InMemoryRelation >> [id#13,event#14,entityType#15,entityId#16,targetEntityType#17,targetEntityId#18,properties#19,eventTime#20,eventTimeZone#21,tags#22,prId#23,creationTime#24,creationTimeZone#25], >> true, 10000, StorageLevel(true, true, false, true, 1), (TungstenProject >> [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS >> entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS >> properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS >> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS >> creationTimeZone#25]), None >> >> == Physical Plan == >> TungstenAggregate(key=[entityId#16], functions=[], output=[entityId#16]) >> TungstenExchange hashpartitioning(entityId#16) >> TungstenAggregate(key=[entityId#16], functions=[], output=[entityId#16]) >> Project [entityId#16] >> Filter (entityType#15 = user) >> InMemoryColumnarTableScan [entityId#16,entityType#15], >> [(entityType#15 = user)], (InMemoryRelation >> [id#13,event#14,entityType#15,entityId#16,targetEntityType#17,targetEntityId#18,properties#19,eventTime#20,eventTimeZone#21,tags#22,prId#23,creationTime#24,creationTimeZone#25], >> true, 10000, StorageLevel(true, true, false, true, 1), (TungstenProject >> [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS >> entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS >> properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS >> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS >> creationTimeZone#25]), None) >> >> Code Generation: true >> >> scala> >> >> >> >> 2015-11-06 5:27 GMT+09:00 Yin Huai <yh...@databricks.com>: >> >>> Can you attach the result of eventDF.filter($"entityType" === >>> "user").select("entityId").distinct.explain(true)? >>> >>> Thanks, >>> >>> Yin >>> >>> On Thu, Nov 5, 2015 at 1:12 AM, 千成徳 <s.c...@opt.ne.jp> wrote: >>> >>>> Hi All, >>>> >>>> I have data frame like this. >>>> >>>> Equality expression is not working in 1.5.1 but, works as expected in >>>> 1.4.0 >>>> What is the difference? >>>> >>>> scala> eventDF.printSchema() >>>> root >>>> |-- id: string (nullable = true) >>>> |-- event: string (nullable = true) >>>> |-- entityType: string (nullable = true) >>>> |-- entityId: string (nullable = true) >>>> |-- targetEntityType: string (nullable = true) >>>> |-- targetEntityId: string (nullable = true) >>>> |-- properties: string (nullable = true) >>>> >>>> scala> eventDF.groupBy("entityType").agg(countDistinct("entityId")).show >>>> +----------+------------------------+ >>>> |entityType|COUNT(DISTINCT entityId)| >>>> +----------+------------------------+ >>>> | ib_user| 4751| >>>> | user| 2091| >>>> +----------+------------------------+ >>>> >>>> >>>> ----- not works ( Bug ? ) >>>> scala> eventDF.filter($"entityType" === >>>> "user").select("entityId").distinct.count >>>> res151: Long = 1219 >>>> >>>> scala> eventDF.filter(eventDF("entityType") === >>>> "user").select("entityId").distinct.count >>>> res153: Long = 1219 >>>> >>>> scala> eventDF.filter($"entityType" equalTo >>>> "user").select("entityId").distinct.count >>>> res149: Long = 1219 >>>> >>>> ----- works as expected >>>> scala> eventDF.map{ e => (e.getAs[String]("entityId"), >>>> e.getAs[String]("entityType")) }.filter(x => x._2 == >>>> "user").map(_._1).distinct.count >>>> res150: Long = 2091 >>>> >>>> scala> eventDF.filter($"entityType" in >>>> "user").select("entityId").distinct.count >>>> warning: there were 1 deprecation warning(s); re-run with -deprecation >>>> for details >>>> res155: Long = 2091 >>>> >>>> scala> eventDF.filter($"entityType" !== >>>> "ib_user").select("entityId").distinct.count >>>> res152: Long = 2091 >>>> >>>> >>>> But, All of above code works in 1.4.0 >>>> >>>> Thanks. >>>> >>>> >>> >> >> >> -- >> ------------------------------------------------------- >> 千 成徳 (Sondoku Chon) >> >> 株 式 会社オプトホールディング http://www.opt.ne.jp/holding/ >> データサイエンスラボ https://datasciencelab.jp/ >> ビッ クデータアーキテクト >> >> 〒 102-0081 東京都千代田区四番町6東急番町ビル >> Tel:080-4581-9708 >> Fax:050-3156-7346 >> ------------------------------------------------------- >> >> >> >> >> ------------------------------ >> Kelkoo SAS >> Société par Actions Simplifiée >> Au capital de € 4.168.964,30 >> Siège social : 158 Ter Rue du Temple 75003 Paris >> 425 093 069 RCS Paris >> >> Ce message et les pièces jointes sont confidentiels et établis à >> l'attention exclusive de leurs destinataires. Si vous n'êtes pas le >> destinataire de ce message, merci de le détruire et d'en avertir >> l'expéditeur. >> > > > > -- > ------------------------------------------------------- > 千 成徳 (Sondoku Chon) > > 株式会社オプトホールディング http://www.opt.ne.jp/holding/ > データサイエンスラボ https://datasciencelab.jp/ > ビックデータアーキテクト > > 〒102-0081 東京都千代田区四番町6東急番町ビル > Tel:080-4581-9708 > Fax:050-3156-7346 > ------------------------------------------------------- >