Re: [DISCUSS] Expensive deterministic UDFs

2019-11-07 Thread Wenchen Fan
We really need some documents to define what non-deterministic means.
AFAIK, non-deterministic expressions may produce a different result for the
same input row, if the already processed input rows are different.

The optimizer tries its best to not change the input sequence
of non-deterministic expressions. For example, `df.select(...,
nonDeterministicExpr).filter...` can't do filter pushdown. An exception is
filter condition. For `df.filter(nonDeterministic && cond)`, Spark still
pushes down `cond` even if it may change the input sequence of the first
condition. This is to respect the SQL semantic that filter conditions ANDed
together are order-insensitive. Users should write `
df.filter(nonDeterministic).filter(cond)` to guarantee the order.

For this particular problem, I think it's not only about UDF, but a general
problem with how Spark collapses Projects.
For example, `df.select('a * 5 as 'b).select('b + 2, 'b + 3)`,  Spark
optimizes it to `df.select('a * 5 + 2, 'a * 5 + 3)`, and execute 'a * 5
twice.

I think we should revisit this optimization and think about when we can
collapse.

On Thu, Nov 7, 2019 at 6:20 PM Rubén Berenguel  wrote:

> That was very interesting, thanks Enrico.
>
> Sean, IIRC it also prevents push down of the UDF in Catalyst in some cases.
>
> Regards,
>
> Ruben
>
> > On 7 Nov 2019, at 11:09, Sean Owen  wrote:
> >
> > Interesting, what does non-deterministic do except have this effect?
> > aside from the naming, it could be a fine use of this flag if that's
> > all it effectively does. I'm not sure I'd introduce another flag with
> > the same semantics just over naming. If anything 'expensive' also
> > isn't the right word, more like 'try not to evaluate multiple times'.
> >
> > Why isn't caching the answer? I realize it's big, but you can cache to
> > disk. This may be faster than whatever plan reordering has to happen
> > to evaluate once.
> >
> > Usually I'd say, can you redesign your UDF and code to be more
> > efficient too? or use a big a cluster if that's really what you need.
> >
> > At first look, no I don't think this Spark-side workaround for naming
> > for your use case is worthwhile. There are existing better solutions.
> >
> > On Thu, Nov 7, 2019 at 2:45 AM Enrico Minack 
> wrote:
> >>
> >> Hi all,
> >>
> >> Running expensive deterministic UDFs that return complex types,
> followed by multiple references to those results cause Spark to evaluate
> the UDF multiple times per row. This has been reported and discussed
> before: SPARK-18748 SPARK-17728
> >>
> >>val f: Int => Array[Int]
> >>val udfF = udf(f)
> >>df
> >>  .select($"id", udfF($"id").as("array"))
> >>  .select($"array"(0).as("array0"), $"array"(1).as("array1"))
> >>
> >> A common approach to make Spark evaluate the UDF only once is to cache
> the intermediate result right after projecting the UDF:
> >>
> >>df
> >>  .select($"id", udfF($"id").as("array"))
> >>  .cache()
> >>  .select($"array"(0).as("array0"), $"array"(1).as("array1"))
> >>
> >> There are scenarios where this intermediate result is too big for the
> cluster to cache. Also this is bad design.
> >>
> >> The best approach is to mark the UDF as non-deterministic. Then Spark
> optimizes the query in a way that the UDF gets called only once per row,
> exactly what you want.
> >>
> >>val udfF = udf(f).asNondeterministic()
> >>
> >> However, stating a UDF is non-deterministic though it clearly is
> deterministic is counter-intuitive and makes your code harder to read.
> >>
> >> Spark should provide a better way to flag the UDF. Calling it expensive
> would be a better naming here.
> >>
> >>val udfF = udf(f).asExpensive()
> >>
> >> I understand that deterministic is a notion that Expression provides,
> and there is no equivalent to expensive that is understood by the
> optimizer. However, that asExpensive() could just set the
> ScalaUDF.udfDeterministic = deterministic && !expensive, which implements
> the best available approach behind a better naming.
> >>
> >> What are your thoughts on asExpensive()?
> >>
> >> Regards,
> >> Enrico
> >
> > -
> > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
> >
>
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: [DISCUSS] Remove sorting of fields in PySpark SQL Row construction

2019-11-07 Thread Takuya UESHIN
+1

On Thu, Nov 7, 2019 at 6:54 PM Shane Knapp  wrote:

> +1
>
> On Thu, Nov 7, 2019 at 6:08 PM Hyukjin Kwon  wrote:
> >
> > +1
> >
> > 2019년 11월 6일 (수) 오후 11:38, Wenchen Fan 님이 작성:
> >>
> >> Sounds reasonable to me. We should make the behavior consistent within
> Spark.
> >>
> >> On Tue, Nov 5, 2019 at 6:29 AM Bryan Cutler  wrote:
> >>>
> >>> Currently, when a PySpark Row is created with keyword arguments, the
> fields are sorted alphabetically. This has created a lot of confusion with
> users because it is not obvious (although it is stated in the pydocs) that
> they will be sorted alphabetically. Then later when applying a schema and
> the field order does not match, an error will occur. Here is a list of some
> of the JIRAs that I have been tracking all related to this issue:
> SPARK-24915, SPARK-22232, SPARK-27939, SPARK-27712, and relevant discussion
> of the issue [1].
> >>>
> >>> The original reason for sorting fields is because kwargs in python <
> 3.6 are not guaranteed to be in the same order that they were entered [2].
> Sorting alphabetically ensures a consistent order. Matters are further
> complicated with the flag _from_dict_ that allows the Row fields to to be
> referenced by name when made by kwargs, but this flag is not serialized
> with the Row and leads to inconsistent behavior. For instance:
> >>>
> >>> >>> spark.createDataFrame([Row(A="1", B="2")], "B string, A
> string").first()
> >>> Row(B='2', A='1')
> >>> >>> spark.createDataFrame(spark.sparkContext.parallelize([Row(A="1",
> B="2")]), "B string, A string").first()
> >>> Row(B='1', A='2')
> >>>
> >>> I think the best way to fix this is to remove the sorting of fields
> when constructing a Row. For users with Python 3.6+, nothing would change
> because these versions of Python ensure that the kwargs stays in the
> ordered entered. For users with Python < 3.6, using kwargs would check a
> conf to either raise an error or fallback to a LegacyRow that sorts the
> fields as before. With Python < 3.6 being deprecated now, this LegacyRow
> can also be removed at the same time. There are also other ways to create
> Rows that will not be affected. I have opened a JIRA [3] to capture this,
> but I am wondering what others think about fixing this for Spark 3.0?
> >>>
> >>> [1] https://github.com/apache/spark/pull/20280
> >>> [2] https://www.python.org/dev/peps/pep-0468/
> >>> [3] https://issues.apache.org/jira/browse/SPARK-29748
>
>
>
> --
> Shane Knapp
> UC Berkeley EECS Research / RISELab Staff Technical Lead
> https://rise.cs.berkeley.edu
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
Takuya UESHIN
Tokyo, Japan

http://twitter.com/ueshin


Re: [DISCUSS] Remove sorting of fields in PySpark SQL Row construction

2019-11-07 Thread Shane Knapp
+1

On Thu, Nov 7, 2019 at 6:08 PM Hyukjin Kwon  wrote:
>
> +1
>
> 2019년 11월 6일 (수) 오후 11:38, Wenchen Fan 님이 작성:
>>
>> Sounds reasonable to me. We should make the behavior consistent within Spark.
>>
>> On Tue, Nov 5, 2019 at 6:29 AM Bryan Cutler  wrote:
>>>
>>> Currently, when a PySpark Row is created with keyword arguments, the fields 
>>> are sorted alphabetically. This has created a lot of confusion with users 
>>> because it is not obvious (although it is stated in the pydocs) that they 
>>> will be sorted alphabetically. Then later when applying a schema and the 
>>> field order does not match, an error will occur. Here is a list of some of 
>>> the JIRAs that I have been tracking all related to this issue: SPARK-24915, 
>>> SPARK-22232, SPARK-27939, SPARK-27712, and relevant discussion of the issue 
>>> [1].
>>>
>>> The original reason for sorting fields is because kwargs in python < 3.6 
>>> are not guaranteed to be in the same order that they were entered [2]. 
>>> Sorting alphabetically ensures a consistent order. Matters are further 
>>> complicated with the flag _from_dict_ that allows the Row fields to to be 
>>> referenced by name when made by kwargs, but this flag is not serialized 
>>> with the Row and leads to inconsistent behavior. For instance:
>>>
>>> >>> spark.createDataFrame([Row(A="1", B="2")], "B string, A string").first()
>>> Row(B='2', A='1')
>>> >>> spark.createDataFrame(spark.sparkContext.parallelize([Row(A="1", 
>>> >>> B="2")]), "B string, A string").first()
>>> Row(B='1', A='2')
>>>
>>> I think the best way to fix this is to remove the sorting of fields when 
>>> constructing a Row. For users with Python 3.6+, nothing would change 
>>> because these versions of Python ensure that the kwargs stays in the 
>>> ordered entered. For users with Python < 3.6, using kwargs would check a 
>>> conf to either raise an error or fallback to a LegacyRow that sorts the 
>>> fields as before. With Python < 3.6 being deprecated now, this LegacyRow 
>>> can also be removed at the same time. There are also other ways to create 
>>> Rows that will not be affected. I have opened a JIRA [3] to capture this, 
>>> but I am wondering what others think about fixing this for Spark 3.0?
>>>
>>> [1] https://github.com/apache/spark/pull/20280
>>> [2] https://www.python.org/dev/peps/pep-0468/
>>> [3] https://issues.apache.org/jira/browse/SPARK-29748



-- 
Shane Knapp
UC Berkeley EECS Research / RISELab Staff Technical Lead
https://rise.cs.berkeley.edu

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: [DISCUSS] Remove sorting of fields in PySpark SQL Row construction

2019-11-07 Thread Hyukjin Kwon
+1

2019년 11월 6일 (수) 오후 11:38, Wenchen Fan 님이 작성:

> Sounds reasonable to me. We should make the behavior consistent within
> Spark.
>
> On Tue, Nov 5, 2019 at 6:29 AM Bryan Cutler  wrote:
>
>> Currently, when a PySpark Row is created with keyword arguments, the
>> fields are sorted alphabetically. This has created a lot of confusion with
>> users because it is not obvious (although it is stated in the pydocs) that
>> they will be sorted alphabetically. Then later when applying a schema and
>> the field order does not match, an error will occur. Here is a list of some
>> of the JIRAs that I have been tracking all related to this issue:
>> SPARK-24915, SPARK-22232, SPARK-27939, SPARK-27712, and relevant discussion
>> of the issue [1].
>>
>> The original reason for sorting fields is because kwargs in python < 3.6
>> are not guaranteed to be in the same order that they were entered [2].
>> Sorting alphabetically ensures a consistent order. Matters are further
>> complicated with the flag _*from_dict*_ that allows the Row fields to to
>> be referenced by name when made by kwargs, but this flag is not serialized
>> with the Row and leads to inconsistent behavior. For instance:
>>
>> >>> spark.createDataFrame([Row(A="1", B="2")], "B string, A string").first()
>> Row(B='2', A='1')>>> 
>> spark.createDataFrame(spark.sparkContext.parallelize([Row(A="1", B="2")]), 
>> "B string, A string").first()
>> Row(B='1', A='2')
>>
>> I think the best way to fix this is to remove the sorting of fields when
>> constructing a Row. For users with Python 3.6+, nothing would change
>> because these versions of Python ensure that the kwargs stays in the
>> ordered entered. For users with Python < 3.6, using kwargs would check a
>> conf to either raise an error or fallback to a LegacyRow that sorts the
>> fields as before. With Python < 3.6 being deprecated now, this LegacyRow
>> can also be removed at the same time. There are also other ways to create
>> Rows that will not be affected. I have opened a JIRA [3] to capture this,
>> but I am wondering what others think about fixing this for Spark 3.0?
>>
>> [1] https://github.com/apache/spark/pull/20280
>> [2] https://www.python.org/dev/peps/pep-0468/
>> [3] https://issues.apache.org/jira/browse/SPARK-29748
>>
>>


[ANNOUNCE] Announcing Apache Spark 3.0.0-preview

2019-11-07 Thread Xingbo Jiang
Hi all,

To enable wide-scale community testing of the upcoming Spark 3.0 release,
the Apache Spark community has posted a preview release of Spark 3.0. This
preview is *not a stable release in terms of either API or functionality*,
but it is meant to give the community early access to try the code that
will become Spark 3.0. If you would like to test the release, please
download it, and send feedback using either the mailing lists
 or JIRA

.

There are a lot of exciting new features added to Spark 3.0, including
Dynamic Partition Pruning, Adaptive Query Execution, Accelerator-aware
Scheduling, Data Source API with Catalog Supports, Vectorization in SparkR,
support of Hadoop 3/JDK 11/Scala 2.12, and many more. For a full list of
major features and changes in Spark 3.0.0-preview, please check the thread(
http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-3-0-preview-release-feature-list-and-major-changes-td28050.html
).

We'd like to thank our contributors and users for their contributions and
early feedback to this release. This release would not have been possible
without you.

To download Spark 3.0.0-preview, head over to the download page:
https://archive.apache.org/dist/spark/spark-3.0.0-preview

Thanks,

Xingbo


Re: [DISCUSS] Expensive deterministic UDFs

2019-11-07 Thread Rubén Berenguel
That was very interesting, thanks Enrico.

Sean, IIRC it also prevents push down of the UDF in Catalyst in some cases.

Regards, 

Ruben

> On 7 Nov 2019, at 11:09, Sean Owen  wrote:
> 
> Interesting, what does non-deterministic do except have this effect?
> aside from the naming, it could be a fine use of this flag if that's
> all it effectively does. I'm not sure I'd introduce another flag with
> the same semantics just over naming. If anything 'expensive' also
> isn't the right word, more like 'try not to evaluate multiple times'.
> 
> Why isn't caching the answer? I realize it's big, but you can cache to
> disk. This may be faster than whatever plan reordering has to happen
> to evaluate once.
> 
> Usually I'd say, can you redesign your UDF and code to be more
> efficient too? or use a big a cluster if that's really what you need.
> 
> At first look, no I don't think this Spark-side workaround for naming
> for your use case is worthwhile. There are existing better solutions.
> 
> On Thu, Nov 7, 2019 at 2:45 AM Enrico Minack  wrote:
>> 
>> Hi all,
>> 
>> Running expensive deterministic UDFs that return complex types, followed by 
>> multiple references to those results cause Spark to evaluate the UDF 
>> multiple times per row. This has been reported and discussed before: 
>> SPARK-18748 SPARK-17728
>> 
>>val f: Int => Array[Int]
>>val udfF = udf(f)
>>df
>>  .select($"id", udfF($"id").as("array"))
>>  .select($"array"(0).as("array0"), $"array"(1).as("array1"))
>> 
>> A common approach to make Spark evaluate the UDF only once is to cache the 
>> intermediate result right after projecting the UDF:
>> 
>>df
>>  .select($"id", udfF($"id").as("array"))
>>  .cache()
>>  .select($"array"(0).as("array0"), $"array"(1).as("array1"))
>> 
>> There are scenarios where this intermediate result is too big for the 
>> cluster to cache. Also this is bad design.
>> 
>> The best approach is to mark the UDF as non-deterministic. Then Spark 
>> optimizes the query in a way that the UDF gets called only once per row, 
>> exactly what you want.
>> 
>>val udfF = udf(f).asNondeterministic()
>> 
>> However, stating a UDF is non-deterministic though it clearly is 
>> deterministic is counter-intuitive and makes your code harder to read.
>> 
>> Spark should provide a better way to flag the UDF. Calling it expensive 
>> would be a better naming here.
>> 
>>val udfF = udf(f).asExpensive()
>> 
>> I understand that deterministic is a notion that Expression provides, and 
>> there is no equivalent to expensive that is understood by the optimizer. 
>> However, that asExpensive() could just set the ScalaUDF.udfDeterministic = 
>> deterministic && !expensive, which implements the best available approach 
>> behind a better naming.
>> 
>> What are your thoughts on asExpensive()?
>> 
>> Regards,
>> Enrico
> 
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
> 


-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: [DISCUSS] Expensive deterministic UDFs

2019-11-07 Thread Sean Owen
Interesting, what does non-deterministic do except have this effect?
aside from the naming, it could be a fine use of this flag if that's
all it effectively does. I'm not sure I'd introduce another flag with
the same semantics just over naming. If anything 'expensive' also
isn't the right word, more like 'try not to evaluate multiple times'.

Why isn't caching the answer? I realize it's big, but you can cache to
disk. This may be faster than whatever plan reordering has to happen
to evaluate once.

Usually I'd say, can you redesign your UDF and code to be more
efficient too? or use a big a cluster if that's really what you need.

At first look, no I don't think this Spark-side workaround for naming
for your use case is worthwhile. There are existing better solutions.

On Thu, Nov 7, 2019 at 2:45 AM Enrico Minack  wrote:
>
> Hi all,
>
> Running expensive deterministic UDFs that return complex types, followed by 
> multiple references to those results cause Spark to evaluate the UDF multiple 
> times per row. This has been reported and discussed before: SPARK-18748 
> SPARK-17728
>
> val f: Int => Array[Int]
> val udfF = udf(f)
> df
>   .select($"id", udfF($"id").as("array"))
>   .select($"array"(0).as("array0"), $"array"(1).as("array1"))
>
> A common approach to make Spark evaluate the UDF only once is to cache the 
> intermediate result right after projecting the UDF:
>
> df
>   .select($"id", udfF($"id").as("array"))
>   .cache()
>   .select($"array"(0).as("array0"), $"array"(1).as("array1"))
>
> There are scenarios where this intermediate result is too big for the cluster 
> to cache. Also this is bad design.
>
> The best approach is to mark the UDF as non-deterministic. Then Spark 
> optimizes the query in a way that the UDF gets called only once per row, 
> exactly what you want.
>
> val udfF = udf(f).asNondeterministic()
>
> However, stating a UDF is non-deterministic though it clearly is 
> deterministic is counter-intuitive and makes your code harder to read.
>
> Spark should provide a better way to flag the UDF. Calling it expensive would 
> be a better naming here.
>
> val udfF = udf(f).asExpensive()
>
> I understand that deterministic is a notion that Expression provides, and 
> there is no equivalent to expensive that is understood by the optimizer. 
> However, that asExpensive() could just set the ScalaUDF.udfDeterministic = 
> deterministic && !expensive, which implements the best available approach 
> behind a better naming.
>
> What are your thoughts on asExpensive()?
>
> Regards,
> Enrico

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org