Here is the pretty print of the physical plan which reveals some details
about what causes the bug (see the lines highlighted in bold):
WithColumnRenamed() fails to update the dependency graph correctly:


'Resolved attribute(s) kk#144L missing from ID#118,LABEL#119,kk#96L,score#121
in operator !Project [ID#118, score#121, LABEL#119, kk#144L]. Attribute(s)
with the same name appear in the operation: kk. Please check if the right
attribute(s) are used

Project [ID#64, kk#73L, score#67, LABEL#65, cnt1#123L]
+- Join Inner, ((ID#64 = ID#135) && (kk#73L = kk#128L))
   :- Project [ID#64, score#67, LABEL#65, kk#73L]
   :  +- Join Inner, (ID#64 = ID#99)
   :     :- Project [ID#64, score#67, LABEL#65, kk#73L]
   :     :  +- Project [ID#64, LABEL#65, k#66L AS kk#73L, score#67]
   :     :     +- LogicalRDD [ID#64, LABEL#65, k#66L, score#67]
   :     +- Project [ID#99]
   :        +- Filter (nL#90L > cast(1 as bigint))
   :           +- Aggregate [ID#99], [ID#99, count(distinct LABEL#100) AS
nL#90L]
   :              +- Project [ID#99, score#102, LABEL#100, kk#73L]
   :                 +- Project [ID#99, LABEL#100, k#101L AS kk#73L,
score#102]
   :                    +- LogicalRDD [ID#99, LABEL#100, k#101L, score#102]
   +- Project [ID#135, kk#128L, count#118L AS cnt1#123L]
      +- Aggregate [ID#135, kk#128L], [ID#135, kk#128L, count(1) AS
count#118L]
         +- Project [ID#135, score#138, LABEL#136, kk#128L]
            +- Join Inner, (ID#135 = ID#99)
               :- Project [ID#135, score#138, LABEL#136, kk#128L]
               :  +- *Project [ID#135, LABEL#136, k#137L AS kk#128L,
score#138]*
               :     +- LogicalRDD [ID#135, LABEL#136, k#137L, score#138]
               +- Project [ID#99]
                  +- Filter (nL#90L > cast(1 as bigint))
                     +- Aggregate [ID#99], [ID#99, count(distinct
LABEL#100) AS nL#90L]
                        +- *!Project [ID#99, score#102, LABEL#100, kk#128L]*
                           +-* Project [ID#99, LABEL#100, k#101L AS kk#73L,
score#102]*
                              +- LogicalRDD [ID#99, LABEL#100, k#101L,
score#102]

Here is the code which generates the error:

import pyspark.sql.functions as F
from pyspark.sql import Row
df =
spark.createDataFrame([Row(score=1.0,ID='abc',LABEL=True,k=2),Row(score=1.0,ID='abc',LABEL=False,k=3)]).withColumnRenamed("k","kk").select("ID","score","LABEL","kk")
df_t =
df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).filter(F.col("nL")>1)
df = df.join(df_t.select("ID"),["ID"])
df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count", "cnt1")
df = df.join(df_sw, ["ID","kk"])


On Tue, Apr 10, 2018 at 1:37 PM, Shiyuan <gshy2...@gmail.com> wrote:

> The spark warning about Row instead of Dict is not the culprit. The
> problem still persists after I use Row instead of Dict to generate the
> dataframe.
>
> Here is the expain() output regarding the reassignment of df as Gourav
> suggests to run, They look the same except that  the serial numbers
> following the columns are different(eg. ID#7273 vs. ID#7344).
>
> this is the output of df.explain() after df = df.join(df_t.select("ID"),["
> ID"])
> == Physical Plan == *(6) Project [ID#7273, score#7276, LABEL#7274,
> kk#7281L] +- *(6) SortMergeJoin [ID#7273], [ID#7303], Inner :- *(2) Sort
> [ID#7273 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(ID#7273,
> 200) : +- *(1) Project [ID#7273, score#7276, LABEL#7274, k#7275L AS
> kk#7281L] : +- *(1) Filter isnotnull(ID#7273) : +- *(1) Scan
> ExistingRDD[ID#7273,LABEL#7274,k#7275L,score#7276] +- *(5) Sort [ID#7303
> ASC NULLS FIRST], false, 0 +- *(5) Project [ID#7303] +- *(5) Filter
> (nL#7295L > 1) +- *(5) HashAggregate(keys=[ID#7303],
> functions=[finalmerge_count(distinct merge count#7314L) AS
> count(LABEL#7304)#7294L]) +- Exchange hashpartitioning(ID#7303, 200) +-
> *(4) HashAggregate(keys=[ID#7303], functions=[partial_count(distinct
> LABEL#7304) AS count#7314L]) +- *(4) HashAggregate(keys=[ID#7303,
> LABEL#7304], functions=[]) +- Exchange hashpartitioning(ID#7303,
> LABEL#7304, 200) +- *(3) HashAggregate(keys=[ID#7303, LABEL#7304],
> functions=[]) +- *(3) Project [ID#7303, LABEL#7304] +- *(3) Filter
> isnotnull(ID#7303) +- *(3) Scan 
> ExistingRDD[ID#7303,LABEL#7304,k#7305L,score#7306]
>
>
> In comparison, this is the output of df1.explain() after  df1 =
> df.join(df_t.select("ID"),["ID"])?
> == Physical Plan == *(6) Project [ID#7344, score#7347, LABEL#7345,
> kk#7352L] +- *(6) SortMergeJoin [ID#7344], [ID#7374], Inner :- *(2) Sort
> [ID#7344 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(ID#7344,
> 200) : +- *(1) Project [ID#7344, score#7347, LABEL#7345, k#7346L AS
> kk#7352L] : +- *(1) Filter isnotnull(ID#7344) : +- *(1) Scan
> ExistingRDD[ID#7344,LABEL#7345,k#7346L,score#7347] +- *(5) Sort [ID#7374
> ASC NULLS FIRST], false, 0 +- *(5) Project [ID#7374] +- *(5) Filter
> (nL#7366L > 1) +- *(5) HashAggregate(keys=[ID#7374],
> functions=[finalmerge_count(distinct merge count#7385L) AS
> count(LABEL#7375)#7365L]) +- Exchange hashpartitioning(ID#7374, 200) +-
> *(4) HashAggregate(keys=[ID#7374], functions=[partial_count(distinct
> LABEL#7375) AS count#7385L]) +- *(4) HashAggregate(keys=[ID#7374,
> LABEL#7375], functions=[]) +- Exchange hashpartitioning(ID#7374,
> LABEL#7375, 200) +- *(3) HashAggregate(keys=[ID#7374, LABEL#7375],
> functions=[]) +- *(3) Project [ID#7374, LABEL#7375] +- *(3) Filter
> isnotnull(ID#7374) +- *(3) Scan ExistingRDD[ID#7374,LABEL#
> 7375,k#7376L,score#7377]
>
>
> Here is the code I run and the error I get in Spark 2.3.0. By looking at
> the error,  the cause seems to be that  spark doesn't look up the column by
> its name but by a serial number and  the serial number somehow is messed
> up.
>
> import pyspark.sql.functions as F
> from pyspark.sql import Row
> df = spark.createDataFrame([Row(score=1.0,ID='abc',LABEL=True,
> k=2),Row(score=1.0,ID='abc',LABEL=False,k=3)])
>
> df = df.withColumnRenamed("k","kk").select("ID","score","LABEL","kk")
> #line B
> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("
> nL")).filter(F.col("nL")>1)
> df = df.join(df_t.select("ID"),["ID"])
> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count", "cnt1")
> df = df.join(df_sw, ["ID","kk"])
>
> This is the error:
> 'Resolved attribute(s) kk#144L missing from ID#118,LABEL#119,kk#96L,score#121
> in operator !Project [ID#118, score#121, LABEL#119, kk#144L]. Attribute(s)
> with the same name appear in the operation: kk. Please check if the right
> attribute(s) are used.;;\nProject [ID#88, kk#96L, score#91, LABEL#89,
> cnt1#140L]\n+- Join Inner, ((ID#88 = ID#150) && (kk#96L = kk#144L))\n :-
> Project [ID#88, score#91, LABEL#89, kk#96L]\n : +- Join Inner, (ID#88 =
> ID#118)\n : :- Project [ID#88, score#91, LABEL#89, kk#96L]\n : : +- Project
> [ID#88, LABEL#89, k#90L AS kk#96L, score#91]\n : : +- LogicalRDD [ID#88,
> LABEL#89, k#90L, score#91], false\n : +- Project [ID#118]\n : +- Filter
> (nL#110L > cast(1 as bigint))\n : +- Aggregate [ID#118], [ID#118,
> count(distinct LABEL#119) AS nL#110L]\n : +- Project [ID#118, score#121,
> LABEL#119, kk#96L]\n : +- Project [ID#118, LABEL#119, k#120L AS kk#96L,
> score#121]\n : +- LogicalRDD [ID#118, LABEL#119, k#120L, score#121],
> false\n +- Project [ID#150, kk#144L, count#134L AS cnt1#140L]\n +-
> Aggregate [ID#150, kk#144L], [ID#150, kk#144L, count(1) AS count#134L]\n +-
> Project [ID#150, score#153, LABEL#151, kk#144L]\n +- Join Inner, (ID#150 =
> ID#118)\n :- Project [ID#150, score#153, LABEL#151, kk#144L]\n : +- Project
> [ID#150, LABEL#151, k#152L AS kk#144L, score#153]\n : +- LogicalRDD
> [ID#150, LABEL#151, k#152L, score#153], false\n +- Project [ID#118]\n +-
> Filter (nL#110L > cast(1 as bigint))\n +- Aggregate [ID#118], [ID#118,
> count(distinct LABEL#119) AS nL#110L]\n +- !Project [ID#118, score#121,
> LABEL#119, kk#144L]\n +- Project [ID#118, LABEL#119, k#120L AS kk#96L,
> score#121]\n +- LogicalRDD [ID#118, LABEL#119, k#120L, score#121], false\n'
>
>
>
>
> On Mon, Apr 9, 2018 at 3:21 PM, Gourav Sengupta <gourav.sengu...@gmail.com
> > wrote:
>
>> Hi,
>>
>> what I am curious about is the reassignment of df.
>>
>> Can you please look into the explain plan of df after the statement df =
>> df.join(df_t.select("ID"),["ID"])? And then compare with the explain
>> plan of df1 after the statement df1 = df.join(df_t.select("ID"),["ID"])?
>>
>> Its late here, but I am yet to go through this completely.  But I think
>> that SPARK does throw a warning mentioning us to use Row instead of
>> Dictionary.
>>
>> It will be of help if you could kindly try using the below statement and
>> go through your used case once again (I am yet to go through all the lines):
>>
>>
>>
>> from pyspark.sql import Row
>>
>> df = spark.createDataFrame([Row(score = 1.0,ID="abc",LABEL=True,k=2),
>> Row(score = 1.0,ID="abc",LABEL=True,k=3)])
>>
>> Regards,
>> Gourav Sengupta
>>
>>
>> On Mon, Apr 9, 2018 at 6:50 PM, Shiyuan <gshy2...@gmail.com> wrote:
>>
>>> Hi Spark Users,
>>>     The following code snippet has an "attribute missing" error while
>>> the attribute exists.  This bug is  triggered by a particular sequence of
>>> of "select", "groupby" and "join".  Note that if I take away the "select"
>>> in #line B,  the code runs without error.   However, the "select" in #line
>>> B  includes all columns in the dataframe and hence should  not affect the
>>> final result.
>>>
>>>
>>> import pyspark.sql.functions as F
>>> df = spark.createDataFrame([{'score':1.0,'ID':'abc','LABEL':True,
>>> 'k':2},{'score':1.0,'ID':'abc','LABEL':False,'k':3}])
>>>
>>> df = df.withColumnRenamed("k","kk")\
>>>   .select("ID","score","LABEL","kk")    #line B
>>>
>>> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).f
>>> ilter(F.col("nL")>1)
>>> df = df.join(df_t.select("ID"),["ID"])
>>> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count",
>>> "cnt1")
>>> df = df.join(df_sw, ["ID","kk"])
>>>
>>
>>
>

Reply via email to