Hi Shiyuan,
can you show us the output of ¨explain¨ over df (as a last step)?

On 11 April 2018 at 19:47, Shiyuan <gshy2...@gmail.com> wrote:

> Variable name binding is a python thing, and Spark should not care how the
> variable is named. What matters is the dependency graph. Spark fails to
> handle this dependency graph correctly for which I am quite surprised: this
> is just a simple combination of three very common sql operations.
>
>
> On Tue, Apr 10, 2018 at 9:03 PM, Gourav Sengupta <
> gourav.sengu...@gmail.com> wrote:
>
>> Hi Shiyuan,
>>
>> I do not know whether I am right, but I would prefer to avoid expressions
>> in Spark as:
>>
>> df = <<some transformation on df>>
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Tue, Apr 10, 2018 at 10:42 PM, Shiyuan <gshy2...@gmail.com> wrote:
>>
>>> Here is the pretty print of the physical plan which reveals some details
>>> about what causes the bug (see the lines highlighted in bold):
>>> WithColumnRenamed() fails to update the dependency graph correctly:
>>>
>>>
>>> 'Resolved attribute(s) kk#144L missing from
>>> ID#118,LABEL#119,kk#96L,score#121 in operator !Project [ID#118,
>>> score#121, LABEL#119, kk#144L]. Attribute(s) with the same name appear in
>>> the operation: kk. Please check if the right attribute(s) are used
>>>
>>> Project [ID#64, kk#73L, score#67, LABEL#65, cnt1#123L]
>>> +- Join Inner, ((ID#64 = ID#135) && (kk#73L = kk#128L))
>>>    :- Project [ID#64, score#67, LABEL#65, kk#73L]
>>>    :  +- Join Inner, (ID#64 = ID#99)
>>>    :     :- Project [ID#64, score#67, LABEL#65, kk#73L]
>>>    :     :  +- Project [ID#64, LABEL#65, k#66L AS kk#73L, score#67]
>>>    :     :     +- LogicalRDD [ID#64, LABEL#65, k#66L, score#67]
>>>    :     +- Project [ID#99]
>>>    :        +- Filter (nL#90L > cast(1 as bigint))
>>>    :           +- Aggregate [ID#99], [ID#99, count(distinct LABEL#100)
>>> AS nL#90L]
>>>    :              +- Project [ID#99, score#102, LABEL#100, kk#73L]
>>>    :                 +- Project [ID#99, LABEL#100, k#101L AS kk#73L,
>>> score#102]
>>>    :                    +- LogicalRDD [ID#99, LABEL#100, k#101L,
>>> score#102]
>>>    +- Project [ID#135, kk#128L, count#118L AS cnt1#123L]
>>>       +- Aggregate [ID#135, kk#128L], [ID#135, kk#128L, count(1) AS
>>> count#118L]
>>>          +- Project [ID#135, score#138, LABEL#136, kk#128L]
>>>             +- Join Inner, (ID#135 = ID#99)
>>>                :- Project [ID#135, score#138, LABEL#136, kk#128L]
>>>                :  +- *Project [ID#135, LABEL#136, k#137L AS kk#128L,
>>> score#138]*
>>>                :     +- LogicalRDD [ID#135, LABEL#136, k#137L, score#138]
>>>                +- Project [ID#99]
>>>                   +- Filter (nL#90L > cast(1 as bigint))
>>>                      +- Aggregate [ID#99], [ID#99, count(distinct
>>> LABEL#100) AS nL#90L]
>>>                         +- *!Project [ID#99, score#102, LABEL#100,
>>> kk#128L]*
>>>                            +-* Project [ID#99, LABEL#100, k#101L AS
>>> kk#73L, score#102]*
>>>                               +- LogicalRDD [ID#99, LABEL#100, k#101L,
>>> score#102]
>>>
>>> Here is the code which generates the error:
>>>
>>> import pyspark.sql.functions as F
>>> from pyspark.sql import Row
>>> df = spark.createDataFrame([Row(score=1.0,ID='abc',LABEL=True,k=2
>>> ),Row(score=1.0,ID='abc',LABEL=False,k=3)]).withColumnRename
>>> d("k","kk").select("ID","score","LABEL","kk")
>>> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).f
>>> ilter(F.col("nL")>1)
>>> df = df.join(df_t.select("ID"),["ID"])
>>> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count",
>>> "cnt1")
>>> df = df.join(df_sw, ["ID","kk"])
>>>
>>>
>>> On Tue, Apr 10, 2018 at 1:37 PM, Shiyuan <gshy2...@gmail.com> wrote:
>>>
>>>> The spark warning about Row instead of Dict is not the culprit. The
>>>> problem still persists after I use Row instead of Dict to generate the
>>>> dataframe.
>>>>
>>>> Here is the expain() output regarding the reassignment of df as Gourav
>>>> suggests to run, They look the same except that  the serial numbers
>>>> following the columns are different(eg. ID#7273 vs. ID#7344).
>>>>
>>>> this is the output of df.explain() after df =
>>>> df.join(df_t.select("ID"),["ID"])
>>>> == Physical Plan == *(6) Project [ID#7273, score#7276, LABEL#7274,
>>>> kk#7281L] +- *(6) SortMergeJoin [ID#7273], [ID#7303], Inner :- *(2) Sort
>>>> [ID#7273 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(ID#7273,
>>>> 200) : +- *(1) Project [ID#7273, score#7276, LABEL#7274, k#7275L AS
>>>> kk#7281L] : +- *(1) Filter isnotnull(ID#7273) : +- *(1) Scan
>>>> ExistingRDD[ID#7273,LABEL#7274,k#7275L,score#7276] +- *(5) Sort
>>>> [ID#7303 ASC NULLS FIRST], false, 0 +- *(5) Project [ID#7303] +- *(5)
>>>> Filter (nL#7295L > 1) +- *(5) HashAggregate(keys=[ID#7303],
>>>> functions=[finalmerge_count(distinct merge count#7314L) AS
>>>> count(LABEL#7304)#7294L]) +- Exchange hashpartitioning(ID#7303, 200) +-
>>>> *(4) HashAggregate(keys=[ID#7303], functions=[partial_count(distinct
>>>> LABEL#7304) AS count#7314L]) +- *(4) HashAggregate(keys=[ID#7303,
>>>> LABEL#7304], functions=[]) +- Exchange hashpartitioning(ID#7303,
>>>> LABEL#7304, 200) +- *(3) HashAggregate(keys=[ID#7303, LABEL#7304],
>>>> functions=[]) +- *(3) Project [ID#7303, LABEL#7304] +- *(3) Filter
>>>> isnotnull(ID#7303) +- *(3) Scan 
>>>> ExistingRDD[ID#7303,LABEL#7304,k#7305L,score#7306]
>>>>
>>>>
>>>> In comparison, this is the output of df1.explain() after  df1 =
>>>> df.join(df_t.select("ID"),["ID"])?
>>>> == Physical Plan == *(6) Project [ID#7344, score#7347, LABEL#7345,
>>>> kk#7352L] +- *(6) SortMergeJoin [ID#7344], [ID#7374], Inner :- *(2) Sort
>>>> [ID#7344 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(ID#7344,
>>>> 200) : +- *(1) Project [ID#7344, score#7347, LABEL#7345, k#7346L AS
>>>> kk#7352L] : +- *(1) Filter isnotnull(ID#7344) : +- *(1) Scan
>>>> ExistingRDD[ID#7344,LABEL#7345,k#7346L,score#7347] +- *(5) Sort
>>>> [ID#7374 ASC NULLS FIRST], false, 0 +- *(5) Project [ID#7374] +- *(5)
>>>> Filter (nL#7366L > 1) +- *(5) HashAggregate(keys=[ID#7374],
>>>> functions=[finalmerge_count(distinct merge count#7385L) AS
>>>> count(LABEL#7375)#7365L]) +- Exchange hashpartitioning(ID#7374, 200) +-
>>>> *(4) HashAggregate(keys=[ID#7374], functions=[partial_count(distinct
>>>> LABEL#7375) AS count#7385L]) +- *(4) HashAggregate(keys=[ID#7374,
>>>> LABEL#7375], functions=[]) +- Exchange hashpartitioning(ID#7374,
>>>> LABEL#7375, 200) +- *(3) HashAggregate(keys=[ID#7374, LABEL#7375],
>>>> functions=[]) +- *(3) Project [ID#7374, LABEL#7375] +- *(3) Filter
>>>> isnotnull(ID#7374) +- *(3) Scan ExistingRDD[ID#7374,LABEL#7375
>>>> ,k#7376L,score#7377]
>>>>
>>>>
>>>> Here is the code I run and the error I get in Spark 2.3.0. By looking
>>>> at the error,  the cause seems to be that  spark doesn't look up the column
>>>> by its name but by a serial number and  the serial number somehow is messed
>>>> up.
>>>>
>>>> import pyspark.sql.functions as F
>>>> from pyspark.sql import Row
>>>> df = spark.createDataFrame([Row(score=1.0,ID='abc',LABEL=True,k=2
>>>> ),Row(score=1.0,ID='abc',LABEL=False,k=3)])
>>>>
>>>> df = df.withColumnRenamed("k","kk").select("ID","score","LABEL","kk")
>>>>   #line B
>>>> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).f
>>>> ilter(F.col("nL")>1)
>>>> df = df.join(df_t.select("ID"),["ID"])
>>>> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count",
>>>> "cnt1")
>>>> df = df.join(df_sw, ["ID","kk"])
>>>>
>>>> This is the error:
>>>> 'Resolved attribute(s) kk#144L missing from
>>>> ID#118,LABEL#119,kk#96L,score#121 in operator !Project [ID#118,
>>>> score#121, LABEL#119, kk#144L]. Attribute(s) with the same name appear in
>>>> the operation: kk. Please check if the right attribute(s) are
>>>> used.;;\nProject [ID#88, kk#96L, score#91, LABEL#89, cnt1#140L]\n+- Join
>>>> Inner, ((ID#88 = ID#150) && (kk#96L = kk#144L))\n :- Project [ID#88,
>>>> score#91, LABEL#89, kk#96L]\n : +- Join Inner, (ID#88 = ID#118)\n : :-
>>>> Project [ID#88, score#91, LABEL#89, kk#96L]\n : : +- Project [ID#88,
>>>> LABEL#89, k#90L AS kk#96L, score#91]\n : : +- LogicalRDD [ID#88, LABEL#89,
>>>> k#90L, score#91], false\n : +- Project [ID#118]\n : +- Filter (nL#110L >
>>>> cast(1 as bigint))\n : +- Aggregate [ID#118], [ID#118, count(distinct
>>>> LABEL#119) AS nL#110L]\n : +- Project [ID#118, score#121, LABEL#119,
>>>> kk#96L]\n : +- Project [ID#118, LABEL#119, k#120L AS kk#96L, score#121]\n :
>>>> +- LogicalRDD [ID#118, LABEL#119, k#120L, score#121], false\n +- Project
>>>> [ID#150, kk#144L, count#134L AS cnt1#140L]\n +- Aggregate [ID#150,
>>>> kk#144L], [ID#150, kk#144L, count(1) AS count#134L]\n +- Project [ID#150,
>>>> score#153, LABEL#151, kk#144L]\n +- Join Inner, (ID#150 = ID#118)\n :-
>>>> Project [ID#150, score#153, LABEL#151, kk#144L]\n : +- Project [ID#150,
>>>> LABEL#151, k#152L AS kk#144L, score#153]\n : +- LogicalRDD [ID#150,
>>>> LABEL#151, k#152L, score#153], false\n +- Project [ID#118]\n +- Filter
>>>> (nL#110L > cast(1 as bigint))\n +- Aggregate [ID#118], [ID#118,
>>>> count(distinct LABEL#119) AS nL#110L]\n +- !Project [ID#118, score#121,
>>>> LABEL#119, kk#144L]\n +- Project [ID#118, LABEL#119, k#120L AS kk#96L,
>>>> score#121]\n +- LogicalRDD [ID#118, LABEL#119, k#120L, score#121], false\n'
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, Apr 9, 2018 at 3:21 PM, Gourav Sengupta <
>>>> gourav.sengu...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> what I am curious about is the reassignment of df.
>>>>>
>>>>> Can you please look into the explain plan of df after the statement df
>>>>> = df.join(df_t.select("ID"),["ID"])? And then compare with the
>>>>> explain plan of df1 after the statement df1 = 
>>>>> df.join(df_t.select("ID"),["ID
>>>>> "])?
>>>>>
>>>>> Its late here, but I am yet to go through this completely.  But I
>>>>> think that SPARK does throw a warning mentioning us to use Row instead of
>>>>> Dictionary.
>>>>>
>>>>> It will be of help if you could kindly try using the below statement
>>>>> and go through your used case once again (I am yet to go through all the
>>>>> lines):
>>>>>
>>>>>
>>>>>
>>>>> from pyspark.sql import Row
>>>>>
>>>>> df = spark.createDataFrame([Row(score = 1.0,ID="abc",LABEL=True,k=2),
>>>>> Row(score = 1.0,ID="abc",LABEL=True,k=3)])
>>>>>
>>>>> Regards,
>>>>> Gourav Sengupta
>>>>>
>>>>>
>>>>> On Mon, Apr 9, 2018 at 6:50 PM, Shiyuan <gshy2...@gmail.com> wrote:
>>>>>
>>>>>> Hi Spark Users,
>>>>>>     The following code snippet has an "attribute missing" error while
>>>>>> the attribute exists.  This bug is  triggered by a particular sequence of
>>>>>> of "select", "groupby" and "join".  Note that if I take away the "select"
>>>>>> in #line B,  the code runs without error.   However, the "select" in 
>>>>>> #line
>>>>>> B  includes all columns in the dataframe and hence should  not affect the
>>>>>> final result.
>>>>>>
>>>>>>
>>>>>> import pyspark.sql.functions as F
>>>>>> df = spark.createDataFrame([{'score':1.0,'ID':'abc','LABEL':True,
>>>>>> 'k':2},{'score':1.0,'ID':'abc','LABEL':False,'k':3}])
>>>>>>
>>>>>> df = df.withColumnRenamed("k","kk")\
>>>>>>   .select("ID","score","LABEL","kk")    #line B
>>>>>>
>>>>>> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).f
>>>>>> ilter(F.col("nL")>1)
>>>>>> df = df.join(df_t.select("ID"),["ID"])
>>>>>> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count",
>>>>>> "cnt1")
>>>>>> df = df.join(df_sw, ["ID","kk"])
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to