[jira] [Created] (SPARK-22541) Dataframes: applying multiple filters one after another using udfs and accumulators results in faulty accumulators

2017-11-16 Thread Janne K. Olesen (JIRA)
Janne K. Olesen created SPARK-22541:
---

 Summary: Dataframes: applying multiple filters one after another 
using udfs and accumulators results in faulty accumulators
 Key: SPARK-22541
 URL: https://issues.apache.org/jira/browse/SPARK-22541
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 2.2.0
 Environment: pyspark 2.2.0, ubuntu
Reporter: Janne K. Olesen


I'm using udf filters and accumulators to keep track of filtered rows in 
dataframes.

If I'm applying multiple filters one after the other, they seem to be executed 
in parallel, not in sequence, which messes with the accumulators i'm using to 
keep track of filtered data. 

{code:title=example.py|borderStyle=solid}
from pyspark.sql.functions import udf, col
from pyspark.sql.types import BooleanType
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
df = spark.createDataFrame([("a", 1, 1), ("b", 2, 2), ("c", 3, 3)], ["key", 
"val1", "val2"])

def __myfilter(val, acc):
if val < 2:
return True
else:
acc.add(1)
return False

acc1 = sc.accumulator(0)
acc2 = sc.accumulator(0)

def myfilter1(val):
return __myfilter(val, acc1)

def myfilter2(val):
return __myfilter(val, acc2)

my_udf1 = udf(myfilter1, BooleanType())
my_udf2 = udf(myfilter2, BooleanType())

df.show()
# +---+++
# |key|val1|val2|
# +---+++
# |  a|   1|   1|
# |  b|   2|   2|
# |  c|   3|   3|
# +---+++

df = df.filter(my_udf1(col("val1")))
# df.show()
# +---+++
# |key|val1|val2|
# +---+++
# |  a|   1|   1|
# +---+++
# expected acc1: 2
# expected acc2: 0

df = df.filter(my_udf2(col("val2")))
# df.show()
# +---+++
# |key|val1|val2|
# +---+++
# |  a|   1|   1|
# +---+++
# expected acc1: 2
# expected acc2: 0
df.show()
print("acc1: %s" % acc1.value)  # expected 2, is 2 OK
print("acc2: %s" % acc2.value)  # expected 0, is 2 !!!
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22541) Dataframes: applying multiple filters one after another using udfs and accumulators results in faulty accumulators

2017-11-17 Thread Janne K. Olesen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16256676#comment-16256676
 ] 

Janne K. Olesen commented on SPARK-22541:
-

I agree, the filtered results are correct, but that is beside the point. It 
seems like query optimization does something like
{noformat}
  for row in df:
 result_a = filter1(row)
 result_b = filter2(row)
 result = result_a && result_b
{noformat}

but in my opion it should be 
{noformat}
  for row in df:
 result_a = filter1(row)
 if result_a == True:
return filter2(row)
else
   return False
{noformat}

If filter2 is executed regardless of the result of filter1, this can lead to 
strange errors. Considering the following example:
{code:title=Example.py}
from pyspark.sql.functions import udf, col
from pyspark.sql.types import BooleanType
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df_input = spark.createDataFrame([("a", None), ("b", 2), ("c", 3)], ["key", 
"val"])

# works as expected
df = df_input.filter(col("val").isNotNull())
df = df.filter(col("val") > 2)
df.show()

# this will raise an error and fail
# TypeError: '>' not supported between instances of 'NoneType' and 'int'
isNotNone = udf(lambda x: x is not None, BooleanType())
filter2 = udf(lambda x: x > 2, BooleanType())
df = df_input.filter(isNotNone(col("val")))
df = df.filter(filter2(col("val")))
df.show()
{code}



> Dataframes: applying multiple filters one after another using udfs and 
> accumulators results in faulty accumulators
> --
>
> Key: SPARK-22541
> URL: https://issues.apache.org/jira/browse/SPARK-22541
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0
> Environment: pyspark 2.2.0, ubuntu
>Reporter: Janne K. Olesen
>
> I'm using udf filters and accumulators to keep track of filtered rows in 
> dataframes.
> If I'm applying multiple filters one after the other, they seem to be 
> executed in parallel, not in sequence, which messes with the accumulators i'm 
> using to keep track of filtered data. 
> {code:title=example.py|borderStyle=solid}
> from pyspark.sql.functions import udf, col
> from pyspark.sql.types import BooleanType
> from pyspark.sql import SparkSession
> spark = SparkSession.builder.getOrCreate()
> sc = spark.sparkContext
> df = spark.createDataFrame([("a", 1, 1), ("b", 2, 2), ("c", 3, 3)], ["key", 
> "val1", "val2"])
> def __myfilter(val, acc):
> if val < 2:
> return True
> else:
> acc.add(1)
> return False
> acc1 = sc.accumulator(0)
> acc2 = sc.accumulator(0)
> def myfilter1(val):
> return __myfilter(val, acc1)
> def myfilter2(val):
> return __myfilter(val, acc2)
> my_udf1 = udf(myfilter1, BooleanType())
> my_udf2 = udf(myfilter2, BooleanType())
> df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # |  b|   2|   2|
> # |  c|   3|   3|
> # +---+++
> df = df.filter(my_udf1(col("val1")))
> # df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # +---+++
> # expected acc1: 2
> # expected acc2: 0
> df = df.filter(my_udf2(col("val2")))
> # df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # +---+++
> # expected acc1: 2
> # expected acc2: 0
> df.show()
> print("acc1: %s" % acc1.value)  # expected 2, is 2 OK
> print("acc2: %s" % acc2.value)  # expected 0, is 2 !!!
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org