Hi Shiyuan, can you show us the output of ¨explain¨ over df (as a last step)?
On 11 April 2018 at 19:47, Shiyuan <gshy2...@gmail.com> wrote: > Variable name binding is a python thing, and Spark should not care how the > variable is named. What matters is the dependency graph. Spark fails to > handle this dependency graph correctly for which I am quite surprised: this > is just a simple combination of three very common sql operations. > > > On Tue, Apr 10, 2018 at 9:03 PM, Gourav Sengupta < > gourav.sengu...@gmail.com> wrote: > >> Hi Shiyuan, >> >> I do not know whether I am right, but I would prefer to avoid expressions >> in Spark as: >> >> df = <<some transformation on df>> >> >> >> Regards, >> Gourav Sengupta >> >> On Tue, Apr 10, 2018 at 10:42 PM, Shiyuan <gshy2...@gmail.com> wrote: >> >>> Here is the pretty print of the physical plan which reveals some details >>> about what causes the bug (see the lines highlighted in bold): >>> WithColumnRenamed() fails to update the dependency graph correctly: >>> >>> >>> 'Resolved attribute(s) kk#144L missing from >>> ID#118,LABEL#119,kk#96L,score#121 in operator !Project [ID#118, >>> score#121, LABEL#119, kk#144L]. Attribute(s) with the same name appear in >>> the operation: kk. Please check if the right attribute(s) are used >>> >>> Project [ID#64, kk#73L, score#67, LABEL#65, cnt1#123L] >>> +- Join Inner, ((ID#64 = ID#135) && (kk#73L = kk#128L)) >>> :- Project [ID#64, score#67, LABEL#65, kk#73L] >>> : +- Join Inner, (ID#64 = ID#99) >>> : :- Project [ID#64, score#67, LABEL#65, kk#73L] >>> : : +- Project [ID#64, LABEL#65, k#66L AS kk#73L, score#67] >>> : : +- LogicalRDD [ID#64, LABEL#65, k#66L, score#67] >>> : +- Project [ID#99] >>> : +- Filter (nL#90L > cast(1 as bigint)) >>> : +- Aggregate [ID#99], [ID#99, count(distinct LABEL#100) >>> AS nL#90L] >>> : +- Project [ID#99, score#102, LABEL#100, kk#73L] >>> : +- Project [ID#99, LABEL#100, k#101L AS kk#73L, >>> score#102] >>> : +- LogicalRDD [ID#99, LABEL#100, k#101L, >>> score#102] >>> +- Project [ID#135, kk#128L, count#118L AS cnt1#123L] >>> +- Aggregate [ID#135, kk#128L], [ID#135, kk#128L, count(1) AS >>> count#118L] >>> +- Project [ID#135, score#138, LABEL#136, kk#128L] >>> +- Join Inner, (ID#135 = ID#99) >>> :- Project [ID#135, score#138, LABEL#136, kk#128L] >>> : +- *Project [ID#135, LABEL#136, k#137L AS kk#128L, >>> score#138]* >>> : +- LogicalRDD [ID#135, LABEL#136, k#137L, score#138] >>> +- Project [ID#99] >>> +- Filter (nL#90L > cast(1 as bigint)) >>> +- Aggregate [ID#99], [ID#99, count(distinct >>> LABEL#100) AS nL#90L] >>> +- *!Project [ID#99, score#102, LABEL#100, >>> kk#128L]* >>> +-* Project [ID#99, LABEL#100, k#101L AS >>> kk#73L, score#102]* >>> +- LogicalRDD [ID#99, LABEL#100, k#101L, >>> score#102] >>> >>> Here is the code which generates the error: >>> >>> import pyspark.sql.functions as F >>> from pyspark.sql import Row >>> df = spark.createDataFrame([Row(score=1.0,ID='abc',LABEL=True,k=2 >>> ),Row(score=1.0,ID='abc',LABEL=False,k=3)]).withColumnRename >>> d("k","kk").select("ID","score","LABEL","kk") >>> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).f >>> ilter(F.col("nL")>1) >>> df = df.join(df_t.select("ID"),["ID"]) >>> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count", >>> "cnt1") >>> df = df.join(df_sw, ["ID","kk"]) >>> >>> >>> On Tue, Apr 10, 2018 at 1:37 PM, Shiyuan <gshy2...@gmail.com> wrote: >>> >>>> The spark warning about Row instead of Dict is not the culprit. The >>>> problem still persists after I use Row instead of Dict to generate the >>>> dataframe. >>>> >>>> Here is the expain() output regarding the reassignment of df as Gourav >>>> suggests to run, They look the same except that the serial numbers >>>> following the columns are different(eg. ID#7273 vs. ID#7344). >>>> >>>> this is the output of df.explain() after df = >>>> df.join(df_t.select("ID"),["ID"]) >>>> == Physical Plan == *(6) Project [ID#7273, score#7276, LABEL#7274, >>>> kk#7281L] +- *(6) SortMergeJoin [ID#7273], [ID#7303], Inner :- *(2) Sort >>>> [ID#7273 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(ID#7273, >>>> 200) : +- *(1) Project [ID#7273, score#7276, LABEL#7274, k#7275L AS >>>> kk#7281L] : +- *(1) Filter isnotnull(ID#7273) : +- *(1) Scan >>>> ExistingRDD[ID#7273,LABEL#7274,k#7275L,score#7276] +- *(5) Sort >>>> [ID#7303 ASC NULLS FIRST], false, 0 +- *(5) Project [ID#7303] +- *(5) >>>> Filter (nL#7295L > 1) +- *(5) HashAggregate(keys=[ID#7303], >>>> functions=[finalmerge_count(distinct merge count#7314L) AS >>>> count(LABEL#7304)#7294L]) +- Exchange hashpartitioning(ID#7303, 200) +- >>>> *(4) HashAggregate(keys=[ID#7303], functions=[partial_count(distinct >>>> LABEL#7304) AS count#7314L]) +- *(4) HashAggregate(keys=[ID#7303, >>>> LABEL#7304], functions=[]) +- Exchange hashpartitioning(ID#7303, >>>> LABEL#7304, 200) +- *(3) HashAggregate(keys=[ID#7303, LABEL#7304], >>>> functions=[]) +- *(3) Project [ID#7303, LABEL#7304] +- *(3) Filter >>>> isnotnull(ID#7303) +- *(3) Scan >>>> ExistingRDD[ID#7303,LABEL#7304,k#7305L,score#7306] >>>> >>>> >>>> In comparison, this is the output of df1.explain() after df1 = >>>> df.join(df_t.select("ID"),["ID"])? >>>> == Physical Plan == *(6) Project [ID#7344, score#7347, LABEL#7345, >>>> kk#7352L] +- *(6) SortMergeJoin [ID#7344], [ID#7374], Inner :- *(2) Sort >>>> [ID#7344 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(ID#7344, >>>> 200) : +- *(1) Project [ID#7344, score#7347, LABEL#7345, k#7346L AS >>>> kk#7352L] : +- *(1) Filter isnotnull(ID#7344) : +- *(1) Scan >>>> ExistingRDD[ID#7344,LABEL#7345,k#7346L,score#7347] +- *(5) Sort >>>> [ID#7374 ASC NULLS FIRST], false, 0 +- *(5) Project [ID#7374] +- *(5) >>>> Filter (nL#7366L > 1) +- *(5) HashAggregate(keys=[ID#7374], >>>> functions=[finalmerge_count(distinct merge count#7385L) AS >>>> count(LABEL#7375)#7365L]) +- Exchange hashpartitioning(ID#7374, 200) +- >>>> *(4) HashAggregate(keys=[ID#7374], functions=[partial_count(distinct >>>> LABEL#7375) AS count#7385L]) +- *(4) HashAggregate(keys=[ID#7374, >>>> LABEL#7375], functions=[]) +- Exchange hashpartitioning(ID#7374, >>>> LABEL#7375, 200) +- *(3) HashAggregate(keys=[ID#7374, LABEL#7375], >>>> functions=[]) +- *(3) Project [ID#7374, LABEL#7375] +- *(3) Filter >>>> isnotnull(ID#7374) +- *(3) Scan ExistingRDD[ID#7374,LABEL#7375 >>>> ,k#7376L,score#7377] >>>> >>>> >>>> Here is the code I run and the error I get in Spark 2.3.0. By looking >>>> at the error, the cause seems to be that spark doesn't look up the column >>>> by its name but by a serial number and the serial number somehow is messed >>>> up. >>>> >>>> import pyspark.sql.functions as F >>>> from pyspark.sql import Row >>>> df = spark.createDataFrame([Row(score=1.0,ID='abc',LABEL=True,k=2 >>>> ),Row(score=1.0,ID='abc',LABEL=False,k=3)]) >>>> >>>> df = df.withColumnRenamed("k","kk").select("ID","score","LABEL","kk") >>>> #line B >>>> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).f >>>> ilter(F.col("nL")>1) >>>> df = df.join(df_t.select("ID"),["ID"]) >>>> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count", >>>> "cnt1") >>>> df = df.join(df_sw, ["ID","kk"]) >>>> >>>> This is the error: >>>> 'Resolved attribute(s) kk#144L missing from >>>> ID#118,LABEL#119,kk#96L,score#121 in operator !Project [ID#118, >>>> score#121, LABEL#119, kk#144L]. Attribute(s) with the same name appear in >>>> the operation: kk. Please check if the right attribute(s) are >>>> used.;;\nProject [ID#88, kk#96L, score#91, LABEL#89, cnt1#140L]\n+- Join >>>> Inner, ((ID#88 = ID#150) && (kk#96L = kk#144L))\n :- Project [ID#88, >>>> score#91, LABEL#89, kk#96L]\n : +- Join Inner, (ID#88 = ID#118)\n : :- >>>> Project [ID#88, score#91, LABEL#89, kk#96L]\n : : +- Project [ID#88, >>>> LABEL#89, k#90L AS kk#96L, score#91]\n : : +- LogicalRDD [ID#88, LABEL#89, >>>> k#90L, score#91], false\n : +- Project [ID#118]\n : +- Filter (nL#110L > >>>> cast(1 as bigint))\n : +- Aggregate [ID#118], [ID#118, count(distinct >>>> LABEL#119) AS nL#110L]\n : +- Project [ID#118, score#121, LABEL#119, >>>> kk#96L]\n : +- Project [ID#118, LABEL#119, k#120L AS kk#96L, score#121]\n : >>>> +- LogicalRDD [ID#118, LABEL#119, k#120L, score#121], false\n +- Project >>>> [ID#150, kk#144L, count#134L AS cnt1#140L]\n +- Aggregate [ID#150, >>>> kk#144L], [ID#150, kk#144L, count(1) AS count#134L]\n +- Project [ID#150, >>>> score#153, LABEL#151, kk#144L]\n +- Join Inner, (ID#150 = ID#118)\n :- >>>> Project [ID#150, score#153, LABEL#151, kk#144L]\n : +- Project [ID#150, >>>> LABEL#151, k#152L AS kk#144L, score#153]\n : +- LogicalRDD [ID#150, >>>> LABEL#151, k#152L, score#153], false\n +- Project [ID#118]\n +- Filter >>>> (nL#110L > cast(1 as bigint))\n +- Aggregate [ID#118], [ID#118, >>>> count(distinct LABEL#119) AS nL#110L]\n +- !Project [ID#118, score#121, >>>> LABEL#119, kk#144L]\n +- Project [ID#118, LABEL#119, k#120L AS kk#96L, >>>> score#121]\n +- LogicalRDD [ID#118, LABEL#119, k#120L, score#121], false\n' >>>> >>>> >>>> >>>> >>>> On Mon, Apr 9, 2018 at 3:21 PM, Gourav Sengupta < >>>> gourav.sengu...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> what I am curious about is the reassignment of df. >>>>> >>>>> Can you please look into the explain plan of df after the statement df >>>>> = df.join(df_t.select("ID"),["ID"])? And then compare with the >>>>> explain plan of df1 after the statement df1 = >>>>> df.join(df_t.select("ID"),["ID >>>>> "])? >>>>> >>>>> Its late here, but I am yet to go through this completely. But I >>>>> think that SPARK does throw a warning mentioning us to use Row instead of >>>>> Dictionary. >>>>> >>>>> It will be of help if you could kindly try using the below statement >>>>> and go through your used case once again (I am yet to go through all the >>>>> lines): >>>>> >>>>> >>>>> >>>>> from pyspark.sql import Row >>>>> >>>>> df = spark.createDataFrame([Row(score = 1.0,ID="abc",LABEL=True,k=2), >>>>> Row(score = 1.0,ID="abc",LABEL=True,k=3)]) >>>>> >>>>> Regards, >>>>> Gourav Sengupta >>>>> >>>>> >>>>> On Mon, Apr 9, 2018 at 6:50 PM, Shiyuan <gshy2...@gmail.com> wrote: >>>>> >>>>>> Hi Spark Users, >>>>>> The following code snippet has an "attribute missing" error while >>>>>> the attribute exists. This bug is triggered by a particular sequence of >>>>>> of "select", "groupby" and "join". Note that if I take away the "select" >>>>>> in #line B, the code runs without error. However, the "select" in >>>>>> #line >>>>>> B includes all columns in the dataframe and hence should not affect the >>>>>> final result. >>>>>> >>>>>> >>>>>> import pyspark.sql.functions as F >>>>>> df = spark.createDataFrame([{'score':1.0,'ID':'abc','LABEL':True, >>>>>> 'k':2},{'score':1.0,'ID':'abc','LABEL':False,'k':3}]) >>>>>> >>>>>> df = df.withColumnRenamed("k","kk")\ >>>>>> .select("ID","score","LABEL","kk") #line B >>>>>> >>>>>> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).f >>>>>> ilter(F.col("nL")>1) >>>>>> df = df.join(df_t.select("ID"),["ID"]) >>>>>> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count", >>>>>> "cnt1") >>>>>> df = df.join(df_sw, ["ID","kk"]) >>>>>> >>>>> >>>>> >>>> >>> >> >