Janne K. Olesen created SPARK-22541:
---------------------------------------

             Summary: Dataframes: applying multiple filters one after another 
using udfs and accumulators results in faulty accumulators
                 Key: SPARK-22541
                 URL: https://issues.apache.org/jira/browse/SPARK-22541
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 2.2.0
         Environment: pyspark 2.2.0, ubuntu
            Reporter: Janne K. Olesen


I'm using udf filters and accumulators to keep track of filtered rows in 
dataframes.

If I'm applying multiple filters one after the other, they seem to be executed 
in parallel, not in sequence, which messes with the accumulators i'm using to 
keep track of filtered data. 

{code:title=example.py|borderStyle=solid}
from pyspark.sql.functions import udf, col
from pyspark.sql.types import BooleanType
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
df = spark.createDataFrame([("a", 1, 1), ("b", 2, 2), ("c", 3, 3)], ["key", 
"val1", "val2"])

def __myfilter(val, acc):
    if val < 2:
        return True
    else:
        acc.add(1)
    return False

acc1 = sc.accumulator(0)
acc2 = sc.accumulator(0)

def myfilter1(val):
    return __myfilter(val, acc1)

def myfilter2(val):
    return __myfilter(val, acc2)

my_udf1 = udf(myfilter1, BooleanType())
my_udf2 = udf(myfilter2, BooleanType())

df.show()
# +---+----+----+
# |key|val1|val2|
# +---+----+----+
# |  a|   1|   1|
# |  b|   2|   2|
# |  c|   3|   3|
# +---+----+----+

df = df.filter(my_udf1(col("val1")))
# df.show()
# +---+----+----+
# |key|val1|val2|
# +---+----+----+
# |  a|   1|   1|
# +---+----+----+
# expected acc1: 2
# expected acc2: 0

df = df.filter(my_udf2(col("val2")))
# df.show()
# +---+----+----+
# |key|val1|val2|
# +---+----+----+
# |  a|   1|   1|
# +---+----+----+
# expected acc1: 2
# expected acc2: 0
df.show()
print("acc1: %s" % acc1.value)  # expected 2, is 2 OK
print("acc2: %s" % acc2.value)  # expected 0, is 2 !!!
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to