[jira] [Commented] (SPARK-22541) Dataframes: applying multiple filters one after another using udfs and accumulators results in faulty accumulators

2017-11-27 Thread Andrew Snare (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267567#comment-16267567
 ] 

Andrew Snare commented on SPARK-22541:
--

Although this ship has sailed, a search brought me here while looking into this 
issue. Even though the query optimisation is intended and its behaviour known, 
something really doesn't feel right since it can produce surprising results.

Given:
{code}
>>> recip = sf.udf(lambda x: 1 / x, FloatType())
{code}

This fails:
{code}
>>> spark.createDataFrame([[0.0], [1.0]], ['value']) \
...  .select(sf.when(sf.col('value') > 0, recip('value'))).show()
[…]
ZeroDivisionError: float division by zero
{code}

This succeeds:
{code}
>>> spark.createDataFrame([[0.0], [1.0]], ['value']) \
...  .select(sf.when(sf.col('value') > 0, 1 / sf.col('value'))).show()
+--+
|CASE WHEN (value > 0) THEN (1 / value) END|
+--+
|  null|
|   1.0|
+--+
{code}

The scala equivalents of _both_ succeed:
{code}
scala> val recip = udf((x: Float) => 1 / x)
scala> Seq(0.0, 1.0).toDF.select(when('value > 0, recip('value))).show()
+-+
|CASE WHEN (value > 0) THEN UDF(value) END|
+-+
| null|
|  1.0|
+-+
scala> Seq(0.0, 1.0).toDF.select(when('value > 0, lit(1) / 'value)).show()
+--+
|CASE WHEN (value > 0) THEN (1 / value) END|
+--+
|  null|
|   1.0|
+--+
{code}


> Dataframes: applying multiple filters one after another using udfs and 
> accumulators results in faulty accumulators
> --
>
> Key: SPARK-22541
> URL: https://issues.apache.org/jira/browse/SPARK-22541
> Project: Spark
>  Issue Type: Documentation
>  Components: PySpark
>Affects Versions: 2.2.0
> Environment: pyspark 2.2.0, ubuntu
>Reporter: Janne K. Olesen
>Assignee: Liang-Chi Hsieh
> Fix For: 2.3.0
>
>
> I'm using udf filters and accumulators to keep track of filtered rows in 
> dataframes.
> If I'm applying multiple filters one after the other, they seem to be 
> executed in parallel, not in sequence, which messes with the accumulators i'm 
> using to keep track of filtered data. 
> {code:title=example.py|borderStyle=solid}
> from pyspark.sql.functions import udf, col
> from pyspark.sql.types import BooleanType
> from pyspark.sql import SparkSession
> spark = SparkSession.builder.getOrCreate()
> sc = spark.sparkContext
> df = spark.createDataFrame([("a", 1, 1), ("b", 2, 2), ("c", 3, 3)], ["key", 
> "val1", "val2"])
> def __myfilter(val, acc):
> if val < 2:
> return True
> else:
> acc.add(1)
> return False
> acc1 = sc.accumulator(0)
> acc2 = sc.accumulator(0)
> def myfilter1(val):
> return __myfilter(val, acc1)
> def myfilter2(val):
> return __myfilter(val, acc2)
> my_udf1 = udf(myfilter1, BooleanType())
> my_udf2 = udf(myfilter2, BooleanType())
> df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # |  b|   2|   2|
> # |  c|   3|   3|
> # +---+++
> df = df.filter(my_udf1(col("val1")))
> # df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # +---+++
> # expected acc1: 2
> # expected acc2: 0
> df = df.filter(my_udf2(col("val2")))
> # df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # +---+++
> # expected acc1: 2
> # expected acc2: 0
> df.show()
> print("acc1: %s" % acc1.value)  # expected 2, is 2 OK
> print("acc2: %s" % acc2.value)  # expected 0, is 2 !!!
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22541) Dataframes: applying multiple filters one after another using udfs and accumulators results in faulty accumulators

2017-11-20 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16260240#comment-16260240
 ] 

Liang-Chi Hsieh commented on SPARK-22541:
-

Since this is known behavior, I will change this from bug to documentation.

> Dataframes: applying multiple filters one after another using udfs and 
> accumulators results in faulty accumulators
> --
>
> Key: SPARK-22541
> URL: https://issues.apache.org/jira/browse/SPARK-22541
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0
> Environment: pyspark 2.2.0, ubuntu
>Reporter: Janne K. Olesen
>
> I'm using udf filters and accumulators to keep track of filtered rows in 
> dataframes.
> If I'm applying multiple filters one after the other, they seem to be 
> executed in parallel, not in sequence, which messes with the accumulators i'm 
> using to keep track of filtered data. 
> {code:title=example.py|borderStyle=solid}
> from pyspark.sql.functions import udf, col
> from pyspark.sql.types import BooleanType
> from pyspark.sql import SparkSession
> spark = SparkSession.builder.getOrCreate()
> sc = spark.sparkContext
> df = spark.createDataFrame([("a", 1, 1), ("b", 2, 2), ("c", 3, 3)], ["key", 
> "val1", "val2"])
> def __myfilter(val, acc):
> if val < 2:
> return True
> else:
> acc.add(1)
> return False
> acc1 = sc.accumulator(0)
> acc2 = sc.accumulator(0)
> def myfilter1(val):
> return __myfilter(val, acc1)
> def myfilter2(val):
> return __myfilter(val, acc2)
> my_udf1 = udf(myfilter1, BooleanType())
> my_udf2 = udf(myfilter2, BooleanType())
> df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # |  b|   2|   2|
> # |  c|   3|   3|
> # +---+++
> df = df.filter(my_udf1(col("val1")))
> # df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # +---+++
> # expected acc1: 2
> # expected acc2: 0
> df = df.filter(my_udf2(col("val2")))
> # df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # +---+++
> # expected acc1: 2
> # expected acc2: 0
> df.show()
> print("acc1: %s" % acc1.value)  # expected 2, is 2 OK
> print("acc2: %s" % acc2.value)  # expected 0, is 2 !!!
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22541) Dataframes: applying multiple filters one after another using udfs and accumulators results in faulty accumulators

2017-11-19 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258886#comment-16258886
 ] 

Apache Spark commented on SPARK-22541:
--

User 'viirya' has created a pull request for this issue:
https://github.com/apache/spark/pull/19787

> Dataframes: applying multiple filters one after another using udfs and 
> accumulators results in faulty accumulators
> --
>
> Key: SPARK-22541
> URL: https://issues.apache.org/jira/browse/SPARK-22541
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0
> Environment: pyspark 2.2.0, ubuntu
>Reporter: Janne K. Olesen
>
> I'm using udf filters and accumulators to keep track of filtered rows in 
> dataframes.
> If I'm applying multiple filters one after the other, they seem to be 
> executed in parallel, not in sequence, which messes with the accumulators i'm 
> using to keep track of filtered data. 
> {code:title=example.py|borderStyle=solid}
> from pyspark.sql.functions import udf, col
> from pyspark.sql.types import BooleanType
> from pyspark.sql import SparkSession
> spark = SparkSession.builder.getOrCreate()
> sc = spark.sparkContext
> df = spark.createDataFrame([("a", 1, 1), ("b", 2, 2), ("c", 3, 3)], ["key", 
> "val1", "val2"])
> def __myfilter(val, acc):
> if val < 2:
> return True
> else:
> acc.add(1)
> return False
> acc1 = sc.accumulator(0)
> acc2 = sc.accumulator(0)
> def myfilter1(val):
> return __myfilter(val, acc1)
> def myfilter2(val):
> return __myfilter(val, acc2)
> my_udf1 = udf(myfilter1, BooleanType())
> my_udf2 = udf(myfilter2, BooleanType())
> df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # |  b|   2|   2|
> # |  c|   3|   3|
> # +---+++
> df = df.filter(my_udf1(col("val1")))
> # df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # +---+++
> # expected acc1: 2
> # expected acc2: 0
> df = df.filter(my_udf2(col("val2")))
> # df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # +---+++
> # expected acc1: 2
> # expected acc2: 0
> df.show()
> print("acc1: %s" % acc1.value)  # expected 2, is 2 OK
> print("acc2: %s" % acc2.value)  # expected 0, is 2 !!!
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22541) Dataframes: applying multiple filters one after another using udfs and accumulators results in faulty accumulators

2017-11-19 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258873#comment-16258873
 ] 

Liang-Chi Hsieh commented on SPARK-22541:
-

Similar to the case of using python udfs with conditional expressions, I think 
we don't have easy fix for this. 

Btw, I think we should also explicitly note this behavior in the document.

> Dataframes: applying multiple filters one after another using udfs and 
> accumulators results in faulty accumulators
> --
>
> Key: SPARK-22541
> URL: https://issues.apache.org/jira/browse/SPARK-22541
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0
> Environment: pyspark 2.2.0, ubuntu
>Reporter: Janne K. Olesen
>
> I'm using udf filters and accumulators to keep track of filtered rows in 
> dataframes.
> If I'm applying multiple filters one after the other, they seem to be 
> executed in parallel, not in sequence, which messes with the accumulators i'm 
> using to keep track of filtered data. 
> {code:title=example.py|borderStyle=solid}
> from pyspark.sql.functions import udf, col
> from pyspark.sql.types import BooleanType
> from pyspark.sql import SparkSession
> spark = SparkSession.builder.getOrCreate()
> sc = spark.sparkContext
> df = spark.createDataFrame([("a", 1, 1), ("b", 2, 2), ("c", 3, 3)], ["key", 
> "val1", "val2"])
> def __myfilter(val, acc):
> if val < 2:
> return True
> else:
> acc.add(1)
> return False
> acc1 = sc.accumulator(0)
> acc2 = sc.accumulator(0)
> def myfilter1(val):
> return __myfilter(val, acc1)
> def myfilter2(val):
> return __myfilter(val, acc2)
> my_udf1 = udf(myfilter1, BooleanType())
> my_udf2 = udf(myfilter2, BooleanType())
> df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # |  b|   2|   2|
> # |  c|   3|   3|
> # +---+++
> df = df.filter(my_udf1(col("val1")))
> # df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # +---+++
> # expected acc1: 2
> # expected acc2: 0
> df = df.filter(my_udf2(col("val2")))
> # df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # +---+++
> # expected acc1: 2
> # expected acc2: 0
> df.show()
> print("acc1: %s" % acc1.value)  # expected 2, is 2 OK
> print("acc2: %s" % acc2.value)  # expected 0, is 2 !!!
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22541) Dataframes: applying multiple filters one after another using udfs and accumulators results in faulty accumulators

2017-11-19 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258868#comment-16258868
 ] 

Liang-Chi Hsieh commented on SPARK-22541:
-

Sorry, my previous reply is not completely correct.

This behavior is related to how PySpark runs python udfs. We can try to show 
the query plan of the {{df}}:

{code}
...
== Physical Plan ==
*Project [key#0, val1#1L, val2#2L]
+- *Filter (pythonUDF0#21 && pythonUDF1#22)
   +- BatchEvalPython [myfilter1(val1#1L), myfilter2(val2#2L)], [key#0, 
val1#1L, val2#2L, pythonUDF0#21, pythonUDF1#22]
  +- Scan ExistingRDD[key#0,val1#1L,val2#2L]
{code}

The python udfs are pushed down to a special physical operator 
{{BatchEvalPython}} to execute. Due to the implementation details, the pushed 
down python udfs are not conditional. That's said they are evaluated on all 
rows, even logically in the original query they are only evaluated on part of 
rows by using some conditional expressions such as when or if. The issue you 
found here is also the same reason.





> Dataframes: applying multiple filters one after another using udfs and 
> accumulators results in faulty accumulators
> --
>
> Key: SPARK-22541
> URL: https://issues.apache.org/jira/browse/SPARK-22541
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0
> Environment: pyspark 2.2.0, ubuntu
>Reporter: Janne K. Olesen
>
> I'm using udf filters and accumulators to keep track of filtered rows in 
> dataframes.
> If I'm applying multiple filters one after the other, they seem to be 
> executed in parallel, not in sequence, which messes with the accumulators i'm 
> using to keep track of filtered data. 
> {code:title=example.py|borderStyle=solid}
> from pyspark.sql.functions import udf, col
> from pyspark.sql.types import BooleanType
> from pyspark.sql import SparkSession
> spark = SparkSession.builder.getOrCreate()
> sc = spark.sparkContext
> df = spark.createDataFrame([("a", 1, 1), ("b", 2, 2), ("c", 3, 3)], ["key", 
> "val1", "val2"])
> def __myfilter(val, acc):
> if val < 2:
> return True
> else:
> acc.add(1)
> return False
> acc1 = sc.accumulator(0)
> acc2 = sc.accumulator(0)
> def myfilter1(val):
> return __myfilter(val, acc1)
> def myfilter2(val):
> return __myfilter(val, acc2)
> my_udf1 = udf(myfilter1, BooleanType())
> my_udf2 = udf(myfilter2, BooleanType())
> df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # |  b|   2|   2|
> # |  c|   3|   3|
> # +---+++
> df = df.filter(my_udf1(col("val1")))
> # df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # +---+++
> # expected acc1: 2
> # expected acc2: 0
> df = df.filter(my_udf2(col("val2")))
> # df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # +---+++
> # expected acc1: 2
> # expected acc2: 0
> df.show()
> print("acc1: %s" % acc1.value)  # expected 2, is 2 OK
> print("acc2: %s" % acc2.value)  # expected 0, is 2 !!!
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22541) Dataframes: applying multiple filters one after another using udfs and accumulators results in faulty accumulators

2017-11-17 Thread Eric Maynard (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16257004#comment-16257004
 ] 

Eric Maynard commented on SPARK-22541:
--

Yeah, this is a common problem when you have side effects in your 
transformations. If you need to enforce a specific order on your 
transformations or otherwise split them up (rather than letting spark behind 
them), you can try putting actions -- like repartitions -- between the 
transformation functions.

> Dataframes: applying multiple filters one after another using udfs and 
> accumulators results in faulty accumulators
> --
>
> Key: SPARK-22541
> URL: https://issues.apache.org/jira/browse/SPARK-22541
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0
> Environment: pyspark 2.2.0, ubuntu
>Reporter: Janne K. Olesen
>
> I'm using udf filters and accumulators to keep track of filtered rows in 
> dataframes.
> If I'm applying multiple filters one after the other, they seem to be 
> executed in parallel, not in sequence, which messes with the accumulators i'm 
> using to keep track of filtered data. 
> {code:title=example.py|borderStyle=solid}
> from pyspark.sql.functions import udf, col
> from pyspark.sql.types import BooleanType
> from pyspark.sql import SparkSession
> spark = SparkSession.builder.getOrCreate()
> sc = spark.sparkContext
> df = spark.createDataFrame([("a", 1, 1), ("b", 2, 2), ("c", 3, 3)], ["key", 
> "val1", "val2"])
> def __myfilter(val, acc):
> if val < 2:
> return True
> else:
> acc.add(1)
> return False
> acc1 = sc.accumulator(0)
> acc2 = sc.accumulator(0)
> def myfilter1(val):
> return __myfilter(val, acc1)
> def myfilter2(val):
> return __myfilter(val, acc2)
> my_udf1 = udf(myfilter1, BooleanType())
> my_udf2 = udf(myfilter2, BooleanType())
> df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # |  b|   2|   2|
> # |  c|   3|   3|
> # +---+++
> df = df.filter(my_udf1(col("val1")))
> # df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # +---+++
> # expected acc1: 2
> # expected acc2: 0
> df = df.filter(my_udf2(col("val2")))
> # df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # +---+++
> # expected acc1: 2
> # expected acc2: 0
> df.show()
> print("acc1: %s" % acc1.value)  # expected 2, is 2 OK
> print("acc2: %s" % acc2.value)  # expected 0, is 2 !!!
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22541) Dataframes: applying multiple filters one after another using udfs and accumulators results in faulty accumulators

2017-11-17 Thread Janne K. Olesen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256676#comment-16256676
 ] 

Janne K. Olesen commented on SPARK-22541:
-

I agree, the filtered results are correct, but that is beside the point. It 
seems like query optimization does something like
{noformat}
  for row in df:
 result_a = filter1(row)
 result_b = filter2(row)
 result = result_a && result_b
{noformat}

but in my opion it should be 
{noformat}
  for row in df:
 result_a = filter1(row)
 if result_a == True:
return filter2(row)
else
   return False
{noformat}

If filter2 is executed regardless of the result of filter1, this can lead to 
strange errors. Considering the following example:
{code:title=Example.py}
from pyspark.sql.functions import udf, col
from pyspark.sql.types import BooleanType
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df_input = spark.createDataFrame([("a", None), ("b", 2), ("c", 3)], ["key", 
"val"])

# works as expected
df = df_input.filter(col("val").isNotNull())
df = df.filter(col("val") > 2)
df.show()

# this will raise an error and fail
# TypeError: '>' not supported between instances of 'NoneType' and 'int'
isNotNone = udf(lambda x: x is not None, BooleanType())
filter2 = udf(lambda x: x > 2, BooleanType())
df = df_input.filter(isNotNone(col("val")))
df = df.filter(filter2(col("val")))
df.show()
{code}



> Dataframes: applying multiple filters one after another using udfs and 
> accumulators results in faulty accumulators
> --
>
> Key: SPARK-22541
> URL: https://issues.apache.org/jira/browse/SPARK-22541
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0
> Environment: pyspark 2.2.0, ubuntu
>Reporter: Janne K. Olesen
>
> I'm using udf filters and accumulators to keep track of filtered rows in 
> dataframes.
> If I'm applying multiple filters one after the other, they seem to be 
> executed in parallel, not in sequence, which messes with the accumulators i'm 
> using to keep track of filtered data. 
> {code:title=example.py|borderStyle=solid}
> from pyspark.sql.functions import udf, col
> from pyspark.sql.types import BooleanType
> from pyspark.sql import SparkSession
> spark = SparkSession.builder.getOrCreate()
> sc = spark.sparkContext
> df = spark.createDataFrame([("a", 1, 1), ("b", 2, 2), ("c", 3, 3)], ["key", 
> "val1", "val2"])
> def __myfilter(val, acc):
> if val < 2:
> return True
> else:
> acc.add(1)
> return False
> acc1 = sc.accumulator(0)
> acc2 = sc.accumulator(0)
> def myfilter1(val):
> return __myfilter(val, acc1)
> def myfilter2(val):
> return __myfilter(val, acc2)
> my_udf1 = udf(myfilter1, BooleanType())
> my_udf2 = udf(myfilter2, BooleanType())
> df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # |  b|   2|   2|
> # |  c|   3|   3|
> # +---+++
> df = df.filter(my_udf1(col("val1")))
> # df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # +---+++
> # expected acc1: 2
> # expected acc2: 0
> df = df.filter(my_udf2(col("val2")))
> # df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # +---+++
> # expected acc1: 2
> # expected acc2: 0
> df.show()
> print("acc1: %s" % acc1.value)  # expected 2, is 2 OK
> print("acc2: %s" % acc2.value)  # expected 0, is 2 !!!
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22541) Dataframes: applying multiple filters one after another using udfs and accumulators results in faulty accumulators

2017-11-16 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256441#comment-16256441
 ] 

Liang-Chi Hsieh commented on SPARK-22541:
-

Due to query optimization, two filters are combined and actually only one 
filter operation is performed. The filtered results are correct. So I don't 
think this is a bug.

> Dataframes: applying multiple filters one after another using udfs and 
> accumulators results in faulty accumulators
> --
>
> Key: SPARK-22541
> URL: https://issues.apache.org/jira/browse/SPARK-22541
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0
> Environment: pyspark 2.2.0, ubuntu
>Reporter: Janne K. Olesen
>
> I'm using udf filters and accumulators to keep track of filtered rows in 
> dataframes.
> If I'm applying multiple filters one after the other, they seem to be 
> executed in parallel, not in sequence, which messes with the accumulators i'm 
> using to keep track of filtered data. 
> {code:title=example.py|borderStyle=solid}
> from pyspark.sql.functions import udf, col
> from pyspark.sql.types import BooleanType
> from pyspark.sql import SparkSession
> spark = SparkSession.builder.getOrCreate()
> sc = spark.sparkContext
> df = spark.createDataFrame([("a", 1, 1), ("b", 2, 2), ("c", 3, 3)], ["key", 
> "val1", "val2"])
> def __myfilter(val, acc):
> if val < 2:
> return True
> else:
> acc.add(1)
> return False
> acc1 = sc.accumulator(0)
> acc2 = sc.accumulator(0)
> def myfilter1(val):
> return __myfilter(val, acc1)
> def myfilter2(val):
> return __myfilter(val, acc2)
> my_udf1 = udf(myfilter1, BooleanType())
> my_udf2 = udf(myfilter2, BooleanType())
> df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # |  b|   2|   2|
> # |  c|   3|   3|
> # +---+++
> df = df.filter(my_udf1(col("val1")))
> # df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # +---+++
> # expected acc1: 2
> # expected acc2: 0
> df = df.filter(my_udf2(col("val2")))
> # df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # +---+++
> # expected acc1: 2
> # expected acc2: 0
> df.show()
> print("acc1: %s" % acc1.value)  # expected 2, is 2 OK
> print("acc2: %s" % acc2.value)  # expected 0, is 2 !!!
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org