Hi Shiyuan,

I do not know whether I am right, but I would prefer to avoid expressions
in Spark as:

df = <<some transformation on df>>


Regards,
Gourav Sengupta

On Tue, Apr 10, 2018 at 10:42 PM, Shiyuan <gshy2...@gmail.com> wrote:

> Here is the pretty print of the physical plan which reveals some details
> about what causes the bug (see the lines highlighted in bold):
> WithColumnRenamed() fails to update the dependency graph correctly:
>
>
> 'Resolved attribute(s) kk#144L missing from ID#118,LABEL#119,kk#96L,score#121
> in operator !Project [ID#118, score#121, LABEL#119, kk#144L]. Attribute(s)
> with the same name appear in the operation: kk. Please check if the right
> attribute(s) are used
>
> Project [ID#64, kk#73L, score#67, LABEL#65, cnt1#123L]
> +- Join Inner, ((ID#64 = ID#135) && (kk#73L = kk#128L))
>    :- Project [ID#64, score#67, LABEL#65, kk#73L]
>    :  +- Join Inner, (ID#64 = ID#99)
>    :     :- Project [ID#64, score#67, LABEL#65, kk#73L]
>    :     :  +- Project [ID#64, LABEL#65, k#66L AS kk#73L, score#67]
>    :     :     +- LogicalRDD [ID#64, LABEL#65, k#66L, score#67]
>    :     +- Project [ID#99]
>    :        +- Filter (nL#90L > cast(1 as bigint))
>    :           +- Aggregate [ID#99], [ID#99, count(distinct LABEL#100) AS
> nL#90L]
>    :              +- Project [ID#99, score#102, LABEL#100, kk#73L]
>    :                 +- Project [ID#99, LABEL#100, k#101L AS kk#73L,
> score#102]
>    :                    +- LogicalRDD [ID#99, LABEL#100, k#101L, score#102]
>    +- Project [ID#135, kk#128L, count#118L AS cnt1#123L]
>       +- Aggregate [ID#135, kk#128L], [ID#135, kk#128L, count(1) AS
> count#118L]
>          +- Project [ID#135, score#138, LABEL#136, kk#128L]
>             +- Join Inner, (ID#135 = ID#99)
>                :- Project [ID#135, score#138, LABEL#136, kk#128L]
>                :  +- *Project [ID#135, LABEL#136, k#137L AS kk#128L,
> score#138]*
>                :     +- LogicalRDD [ID#135, LABEL#136, k#137L, score#138]
>                +- Project [ID#99]
>                   +- Filter (nL#90L > cast(1 as bigint))
>                      +- Aggregate [ID#99], [ID#99, count(distinct
> LABEL#100) AS nL#90L]
>                         +- *!Project [ID#99, score#102, LABEL#100,
> kk#128L]*
>                            +-* Project [ID#99, LABEL#100, k#101L AS
> kk#73L, score#102]*
>                               +- LogicalRDD [ID#99, LABEL#100, k#101L,
> score#102]
>
> Here is the code which generates the error:
>
> import pyspark.sql.functions as F
> from pyspark.sql import Row
> df = spark.createDataFrame([Row(score=1.0,ID='abc',LABEL=True,
> k=2),Row(score=1.0,ID='abc',LABEL=False,k=3)]).
> withColumnRenamed("k","kk").select("ID","score","LABEL","kk")
> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("
> nL")).filter(F.col("nL")>1)
> df = df.join(df_t.select("ID"),["ID"])
> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count", "cnt1")
> df = df.join(df_sw, ["ID","kk"])
>
>
> On Tue, Apr 10, 2018 at 1:37 PM, Shiyuan <gshy2...@gmail.com> wrote:
>
>> The spark warning about Row instead of Dict is not the culprit. The
>> problem still persists after I use Row instead of Dict to generate the
>> dataframe.
>>
>> Here is the expain() output regarding the reassignment of df as Gourav
>> suggests to run, They look the same except that  the serial numbers
>> following the columns are different(eg. ID#7273 vs. ID#7344).
>>
>> this is the output of df.explain() after df =
>> df.join(df_t.select("ID"),["ID"])
>> == Physical Plan == *(6) Project [ID#7273, score#7276, LABEL#7274,
>> kk#7281L] +- *(6) SortMergeJoin [ID#7273], [ID#7303], Inner :- *(2) Sort
>> [ID#7273 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(ID#7273,
>> 200) : +- *(1) Project [ID#7273, score#7276, LABEL#7274, k#7275L AS
>> kk#7281L] : +- *(1) Filter isnotnull(ID#7273) : +- *(1) Scan
>> ExistingRDD[ID#7273,LABEL#7274,k#7275L,score#7276] +- *(5) Sort [ID#7303
>> ASC NULLS FIRST], false, 0 +- *(5) Project [ID#7303] +- *(5) Filter
>> (nL#7295L > 1) +- *(5) HashAggregate(keys=[ID#7303],
>> functions=[finalmerge_count(distinct merge count#7314L) AS
>> count(LABEL#7304)#7294L]) +- Exchange hashpartitioning(ID#7303, 200) +-
>> *(4) HashAggregate(keys=[ID#7303], functions=[partial_count(distinct
>> LABEL#7304) AS count#7314L]) +- *(4) HashAggregate(keys=[ID#7303,
>> LABEL#7304], functions=[]) +- Exchange hashpartitioning(ID#7303,
>> LABEL#7304, 200) +- *(3) HashAggregate(keys=[ID#7303, LABEL#7304],
>> functions=[]) +- *(3) Project [ID#7303, LABEL#7304] +- *(3) Filter
>> isnotnull(ID#7303) +- *(3) Scan 
>> ExistingRDD[ID#7303,LABEL#7304,k#7305L,score#7306]
>>
>>
>> In comparison, this is the output of df1.explain() after  df1 =
>> df.join(df_t.select("ID"),["ID"])?
>> == Physical Plan == *(6) Project [ID#7344, score#7347, LABEL#7345,
>> kk#7352L] +- *(6) SortMergeJoin [ID#7344], [ID#7374], Inner :- *(2) Sort
>> [ID#7344 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(ID#7344,
>> 200) : +- *(1) Project [ID#7344, score#7347, LABEL#7345, k#7346L AS
>> kk#7352L] : +- *(1) Filter isnotnull(ID#7344) : +- *(1) Scan
>> ExistingRDD[ID#7344,LABEL#7345,k#7346L,score#7347] +- *(5) Sort [ID#7374
>> ASC NULLS FIRST], false, 0 +- *(5) Project [ID#7374] +- *(5) Filter
>> (nL#7366L > 1) +- *(5) HashAggregate(keys=[ID#7374],
>> functions=[finalmerge_count(distinct merge count#7385L) AS
>> count(LABEL#7375)#7365L]) +- Exchange hashpartitioning(ID#7374, 200) +-
>> *(4) HashAggregate(keys=[ID#7374], functions=[partial_count(distinct
>> LABEL#7375) AS count#7385L]) +- *(4) HashAggregate(keys=[ID#7374,
>> LABEL#7375], functions=[]) +- Exchange hashpartitioning(ID#7374,
>> LABEL#7375, 200) +- *(3) HashAggregate(keys=[ID#7374, LABEL#7375],
>> functions=[]) +- *(3) Project [ID#7374, LABEL#7375] +- *(3) Filter
>> isnotnull(ID#7374) +- *(3) Scan ExistingRDD[ID#7374,LABEL#7375
>> ,k#7376L,score#7377]
>>
>>
>> Here is the code I run and the error I get in Spark 2.3.0. By looking at
>> the error,  the cause seems to be that  spark doesn't look up the column by
>> its name but by a serial number and  the serial number somehow is messed
>> up.
>>
>> import pyspark.sql.functions as F
>> from pyspark.sql import Row
>> df = spark.createDataFrame([Row(score=1.0,ID='abc',LABEL=True,k=
>> 2),Row(score=1.0,ID='abc',LABEL=False,k=3)])
>>
>> df = df.withColumnRenamed("k","kk").select("ID","score","LABEL","kk")
>> #line B
>> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).
>> filter(F.col("nL")>1)
>> df = df.join(df_t.select("ID"),["ID"])
>> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count",
>> "cnt1")
>> df = df.join(df_sw, ["ID","kk"])
>>
>> This is the error:
>> 'Resolved attribute(s) kk#144L missing from ID#118,LABEL#119,kk#96L,score#121
>> in operator !Project [ID#118, score#121, LABEL#119, kk#144L]. Attribute(s)
>> with the same name appear in the operation: kk. Please check if the right
>> attribute(s) are used.;;\nProject [ID#88, kk#96L, score#91, LABEL#89,
>> cnt1#140L]\n+- Join Inner, ((ID#88 = ID#150) && (kk#96L = kk#144L))\n :-
>> Project [ID#88, score#91, LABEL#89, kk#96L]\n : +- Join Inner, (ID#88 =
>> ID#118)\n : :- Project [ID#88, score#91, LABEL#89, kk#96L]\n : : +- Project
>> [ID#88, LABEL#89, k#90L AS kk#96L, score#91]\n : : +- LogicalRDD [ID#88,
>> LABEL#89, k#90L, score#91], false\n : +- Project [ID#118]\n : +- Filter
>> (nL#110L > cast(1 as bigint))\n : +- Aggregate [ID#118], [ID#118,
>> count(distinct LABEL#119) AS nL#110L]\n : +- Project [ID#118, score#121,
>> LABEL#119, kk#96L]\n : +- Project [ID#118, LABEL#119, k#120L AS kk#96L,
>> score#121]\n : +- LogicalRDD [ID#118, LABEL#119, k#120L, score#121],
>> false\n +- Project [ID#150, kk#144L, count#134L AS cnt1#140L]\n +-
>> Aggregate [ID#150, kk#144L], [ID#150, kk#144L, count(1) AS count#134L]\n +-
>> Project [ID#150, score#153, LABEL#151, kk#144L]\n +- Join Inner, (ID#150 =
>> ID#118)\n :- Project [ID#150, score#153, LABEL#151, kk#144L]\n : +- Project
>> [ID#150, LABEL#151, k#152L AS kk#144L, score#153]\n : +- LogicalRDD
>> [ID#150, LABEL#151, k#152L, score#153], false\n +- Project [ID#118]\n +-
>> Filter (nL#110L > cast(1 as bigint))\n +- Aggregate [ID#118], [ID#118,
>> count(distinct LABEL#119) AS nL#110L]\n +- !Project [ID#118, score#121,
>> LABEL#119, kk#144L]\n +- Project [ID#118, LABEL#119, k#120L AS kk#96L,
>> score#121]\n +- LogicalRDD [ID#118, LABEL#119, k#120L, score#121], false\n'
>>
>>
>>
>>
>> On Mon, Apr 9, 2018 at 3:21 PM, Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> what I am curious about is the reassignment of df.
>>>
>>> Can you please look into the explain plan of df after the statement df =
>>> df.join(df_t.select("ID"),["ID"])? And then compare with the explain
>>> plan of df1 after the statement df1 = df.join(df_t.select("ID"),["ID"])?
>>>
>>> Its late here, but I am yet to go through this completely.  But I think
>>> that SPARK does throw a warning mentioning us to use Row instead of
>>> Dictionary.
>>>
>>> It will be of help if you could kindly try using the below statement and
>>> go through your used case once again (I am yet to go through all the lines):
>>>
>>>
>>>
>>> from pyspark.sql import Row
>>>
>>> df = spark.createDataFrame([Row(score = 1.0,ID="abc",LABEL=True,k=2),
>>> Row(score = 1.0,ID="abc",LABEL=True,k=3)])
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>>
>>> On Mon, Apr 9, 2018 at 6:50 PM, Shiyuan <gshy2...@gmail.com> wrote:
>>>
>>>> Hi Spark Users,
>>>>     The following code snippet has an "attribute missing" error while
>>>> the attribute exists.  This bug is  triggered by a particular sequence of
>>>> of "select", "groupby" and "join".  Note that if I take away the "select"
>>>> in #line B,  the code runs without error.   However, the "select" in #line
>>>> B  includes all columns in the dataframe and hence should  not affect the
>>>> final result.
>>>>
>>>>
>>>> import pyspark.sql.functions as F
>>>> df = spark.createDataFrame([{'score':1.0,'ID':'abc','LABEL':True,
>>>> 'k':2},{'score':1.0,'ID':'abc','LABEL':False,'k':3}])
>>>>
>>>> df = df.withColumnRenamed("k","kk")\
>>>>   .select("ID","score","LABEL","kk")    #line B
>>>>
>>>> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).f
>>>> ilter(F.col("nL")>1)
>>>> df = df.join(df_t.select("ID"),["ID"])
>>>> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count",
>>>> "cnt1")
>>>> df = df.join(df_sw, ["ID","kk"])
>>>>
>>>
>>>
>>
>

Reply via email to