[jira] [Commented] (SPARK-22541) Dataframes: applying multiple filters one after another using udfs and accumulators results in faulty accumulators
[ https://issues.apache.org/jira/browse/SPARK-22541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267567#comment-16267567 ] Andrew Snare commented on SPARK-22541: -- Although this ship has sailed, a search brought me here while looking into this issue. Even though the query optimisation is intended and its behaviour known, something really doesn't feel right since it can produce surprising results. Given: {code} >>> recip = sf.udf(lambda x: 1 / x, FloatType()) {code} This fails: {code} >>> spark.createDataFrame([[0.0], [1.0]], ['value']) \ ... .select(sf.when(sf.col('value') > 0, recip('value'))).show() […] ZeroDivisionError: float division by zero {code} This succeeds: {code} >>> spark.createDataFrame([[0.0], [1.0]], ['value']) \ ... .select(sf.when(sf.col('value') > 0, 1 / sf.col('value'))).show() +--+ |CASE WHEN (value > 0) THEN (1 / value) END| +--+ | null| | 1.0| +--+ {code} The scala equivalents of _both_ succeed: {code} scala> val recip = udf((x: Float) => 1 / x) scala> Seq(0.0, 1.0).toDF.select(when('value > 0, recip('value))).show() +-+ |CASE WHEN (value > 0) THEN UDF(value) END| +-+ | null| | 1.0| +-+ scala> Seq(0.0, 1.0).toDF.select(when('value > 0, lit(1) / 'value)).show() +--+ |CASE WHEN (value > 0) THEN (1 / value) END| +--+ | null| | 1.0| +--+ {code} > Dataframes: applying multiple filters one after another using udfs and > accumulators results in faulty accumulators > -- > > Key: SPARK-22541 > URL: https://issues.apache.org/jira/browse/SPARK-22541 > Project: Spark > Issue Type: Documentation > Components: PySpark >Affects Versions: 2.2.0 > Environment: pyspark 2.2.0, ubuntu >Reporter: Janne K. Olesen >Assignee: Liang-Chi Hsieh > Fix For: 2.3.0 > > > I'm using udf filters and accumulators to keep track of filtered rows in > dataframes. > If I'm applying multiple filters one after the other, they seem to be > executed in parallel, not in sequence, which messes with the accumulators i'm > using to keep track of filtered data. > {code:title=example.py|borderStyle=solid} > from pyspark.sql.functions import udf, col > from pyspark.sql.types import BooleanType > from pyspark.sql import SparkSession > spark = SparkSession.builder.getOrCreate() > sc = spark.sparkContext > df = spark.createDataFrame([("a", 1, 1), ("b", 2, 2), ("c", 3, 3)], ["key", > "val1", "val2"]) > def __myfilter(val, acc): > if val < 2: > return True > else: > acc.add(1) > return False > acc1 = sc.accumulator(0) > acc2 = sc.accumulator(0) > def myfilter1(val): > return __myfilter(val, acc1) > def myfilter2(val): > return __myfilter(val, acc2) > my_udf1 = udf(myfilter1, BooleanType()) > my_udf2 = udf(myfilter2, BooleanType()) > df.show() > # +---+++ > # |key|val1|val2| > # +---+++ > # | a| 1| 1| > # | b| 2| 2| > # | c| 3| 3| > # +---+++ > df = df.filter(my_udf1(col("val1"))) > # df.show() > # +---+++ > # |key|val1|val2| > # +---+++ > # | a| 1| 1| > # +---+++ > # expected acc1: 2 > # expected acc2: 0 > df = df.filter(my_udf2(col("val2"))) > # df.show() > # +---+++ > # |key|val1|val2| > # +---+++ > # | a| 1| 1| > # +---+++ > # expected acc1: 2 > # expected acc2: 0 > df.show() > print("acc1: %s" % acc1.value) # expected 2, is 2 OK > print("acc2: %s" % acc2.value) # expected 0, is 2 !!! > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22541) Dataframes: applying multiple filters one after another using udfs and accumulators results in faulty accumulators
[ https://issues.apache.org/jira/browse/SPARK-22541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16260240#comment-16260240 ] Liang-Chi Hsieh commented on SPARK-22541: - Since this is known behavior, I will change this from bug to documentation. > Dataframes: applying multiple filters one after another using udfs and > accumulators results in faulty accumulators > -- > > Key: SPARK-22541 > URL: https://issues.apache.org/jira/browse/SPARK-22541 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.2.0 > Environment: pyspark 2.2.0, ubuntu >Reporter: Janne K. Olesen > > I'm using udf filters and accumulators to keep track of filtered rows in > dataframes. > If I'm applying multiple filters one after the other, they seem to be > executed in parallel, not in sequence, which messes with the accumulators i'm > using to keep track of filtered data. > {code:title=example.py|borderStyle=solid} > from pyspark.sql.functions import udf, col > from pyspark.sql.types import BooleanType > from pyspark.sql import SparkSession > spark = SparkSession.builder.getOrCreate() > sc = spark.sparkContext > df = spark.createDataFrame([("a", 1, 1), ("b", 2, 2), ("c", 3, 3)], ["key", > "val1", "val2"]) > def __myfilter(val, acc): > if val < 2: > return True > else: > acc.add(1) > return False > acc1 = sc.accumulator(0) > acc2 = sc.accumulator(0) > def myfilter1(val): > return __myfilter(val, acc1) > def myfilter2(val): > return __myfilter(val, acc2) > my_udf1 = udf(myfilter1, BooleanType()) > my_udf2 = udf(myfilter2, BooleanType()) > df.show() > # +---+++ > # |key|val1|val2| > # +---+++ > # | a| 1| 1| > # | b| 2| 2| > # | c| 3| 3| > # +---+++ > df = df.filter(my_udf1(col("val1"))) > # df.show() > # +---+++ > # |key|val1|val2| > # +---+++ > # | a| 1| 1| > # +---+++ > # expected acc1: 2 > # expected acc2: 0 > df = df.filter(my_udf2(col("val2"))) > # df.show() > # +---+++ > # |key|val1|val2| > # +---+++ > # | a| 1| 1| > # +---+++ > # expected acc1: 2 > # expected acc2: 0 > df.show() > print("acc1: %s" % acc1.value) # expected 2, is 2 OK > print("acc2: %s" % acc2.value) # expected 0, is 2 !!! > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22541) Dataframes: applying multiple filters one after another using udfs and accumulators results in faulty accumulators
[ https://issues.apache.org/jira/browse/SPARK-22541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258886#comment-16258886 ] Apache Spark commented on SPARK-22541: -- User 'viirya' has created a pull request for this issue: https://github.com/apache/spark/pull/19787 > Dataframes: applying multiple filters one after another using udfs and > accumulators results in faulty accumulators > -- > > Key: SPARK-22541 > URL: https://issues.apache.org/jira/browse/SPARK-22541 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.2.0 > Environment: pyspark 2.2.0, ubuntu >Reporter: Janne K. Olesen > > I'm using udf filters and accumulators to keep track of filtered rows in > dataframes. > If I'm applying multiple filters one after the other, they seem to be > executed in parallel, not in sequence, which messes with the accumulators i'm > using to keep track of filtered data. > {code:title=example.py|borderStyle=solid} > from pyspark.sql.functions import udf, col > from pyspark.sql.types import BooleanType > from pyspark.sql import SparkSession > spark = SparkSession.builder.getOrCreate() > sc = spark.sparkContext > df = spark.createDataFrame([("a", 1, 1), ("b", 2, 2), ("c", 3, 3)], ["key", > "val1", "val2"]) > def __myfilter(val, acc): > if val < 2: > return True > else: > acc.add(1) > return False > acc1 = sc.accumulator(0) > acc2 = sc.accumulator(0) > def myfilter1(val): > return __myfilter(val, acc1) > def myfilter2(val): > return __myfilter(val, acc2) > my_udf1 = udf(myfilter1, BooleanType()) > my_udf2 = udf(myfilter2, BooleanType()) > df.show() > # +---+++ > # |key|val1|val2| > # +---+++ > # | a| 1| 1| > # | b| 2| 2| > # | c| 3| 3| > # +---+++ > df = df.filter(my_udf1(col("val1"))) > # df.show() > # +---+++ > # |key|val1|val2| > # +---+++ > # | a| 1| 1| > # +---+++ > # expected acc1: 2 > # expected acc2: 0 > df = df.filter(my_udf2(col("val2"))) > # df.show() > # +---+++ > # |key|val1|val2| > # +---+++ > # | a| 1| 1| > # +---+++ > # expected acc1: 2 > # expected acc2: 0 > df.show() > print("acc1: %s" % acc1.value) # expected 2, is 2 OK > print("acc2: %s" % acc2.value) # expected 0, is 2 !!! > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22541) Dataframes: applying multiple filters one after another using udfs and accumulators results in faulty accumulators
[ https://issues.apache.org/jira/browse/SPARK-22541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258873#comment-16258873 ] Liang-Chi Hsieh commented on SPARK-22541: - Similar to the case of using python udfs with conditional expressions, I think we don't have easy fix for this. Btw, I think we should also explicitly note this behavior in the document. > Dataframes: applying multiple filters one after another using udfs and > accumulators results in faulty accumulators > -- > > Key: SPARK-22541 > URL: https://issues.apache.org/jira/browse/SPARK-22541 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.2.0 > Environment: pyspark 2.2.0, ubuntu >Reporter: Janne K. Olesen > > I'm using udf filters and accumulators to keep track of filtered rows in > dataframes. > If I'm applying multiple filters one after the other, they seem to be > executed in parallel, not in sequence, which messes with the accumulators i'm > using to keep track of filtered data. > {code:title=example.py|borderStyle=solid} > from pyspark.sql.functions import udf, col > from pyspark.sql.types import BooleanType > from pyspark.sql import SparkSession > spark = SparkSession.builder.getOrCreate() > sc = spark.sparkContext > df = spark.createDataFrame([("a", 1, 1), ("b", 2, 2), ("c", 3, 3)], ["key", > "val1", "val2"]) > def __myfilter(val, acc): > if val < 2: > return True > else: > acc.add(1) > return False > acc1 = sc.accumulator(0) > acc2 = sc.accumulator(0) > def myfilter1(val): > return __myfilter(val, acc1) > def myfilter2(val): > return __myfilter(val, acc2) > my_udf1 = udf(myfilter1, BooleanType()) > my_udf2 = udf(myfilter2, BooleanType()) > df.show() > # +---+++ > # |key|val1|val2| > # +---+++ > # | a| 1| 1| > # | b| 2| 2| > # | c| 3| 3| > # +---+++ > df = df.filter(my_udf1(col("val1"))) > # df.show() > # +---+++ > # |key|val1|val2| > # +---+++ > # | a| 1| 1| > # +---+++ > # expected acc1: 2 > # expected acc2: 0 > df = df.filter(my_udf2(col("val2"))) > # df.show() > # +---+++ > # |key|val1|val2| > # +---+++ > # | a| 1| 1| > # +---+++ > # expected acc1: 2 > # expected acc2: 0 > df.show() > print("acc1: %s" % acc1.value) # expected 2, is 2 OK > print("acc2: %s" % acc2.value) # expected 0, is 2 !!! > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22541) Dataframes: applying multiple filters one after another using udfs and accumulators results in faulty accumulators
[ https://issues.apache.org/jira/browse/SPARK-22541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258868#comment-16258868 ] Liang-Chi Hsieh commented on SPARK-22541: - Sorry, my previous reply is not completely correct. This behavior is related to how PySpark runs python udfs. We can try to show the query plan of the {{df}}: {code} ... == Physical Plan == *Project [key#0, val1#1L, val2#2L] +- *Filter (pythonUDF0#21 && pythonUDF1#22) +- BatchEvalPython [myfilter1(val1#1L), myfilter2(val2#2L)], [key#0, val1#1L, val2#2L, pythonUDF0#21, pythonUDF1#22] +- Scan ExistingRDD[key#0,val1#1L,val2#2L] {code} The python udfs are pushed down to a special physical operator {{BatchEvalPython}} to execute. Due to the implementation details, the pushed down python udfs are not conditional. That's said they are evaluated on all rows, even logically in the original query they are only evaluated on part of rows by using some conditional expressions such as when or if. The issue you found here is also the same reason. > Dataframes: applying multiple filters one after another using udfs and > accumulators results in faulty accumulators > -- > > Key: SPARK-22541 > URL: https://issues.apache.org/jira/browse/SPARK-22541 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.2.0 > Environment: pyspark 2.2.0, ubuntu >Reporter: Janne K. Olesen > > I'm using udf filters and accumulators to keep track of filtered rows in > dataframes. > If I'm applying multiple filters one after the other, they seem to be > executed in parallel, not in sequence, which messes with the accumulators i'm > using to keep track of filtered data. > {code:title=example.py|borderStyle=solid} > from pyspark.sql.functions import udf, col > from pyspark.sql.types import BooleanType > from pyspark.sql import SparkSession > spark = SparkSession.builder.getOrCreate() > sc = spark.sparkContext > df = spark.createDataFrame([("a", 1, 1), ("b", 2, 2), ("c", 3, 3)], ["key", > "val1", "val2"]) > def __myfilter(val, acc): > if val < 2: > return True > else: > acc.add(1) > return False > acc1 = sc.accumulator(0) > acc2 = sc.accumulator(0) > def myfilter1(val): > return __myfilter(val, acc1) > def myfilter2(val): > return __myfilter(val, acc2) > my_udf1 = udf(myfilter1, BooleanType()) > my_udf2 = udf(myfilter2, BooleanType()) > df.show() > # +---+++ > # |key|val1|val2| > # +---+++ > # | a| 1| 1| > # | b| 2| 2| > # | c| 3| 3| > # +---+++ > df = df.filter(my_udf1(col("val1"))) > # df.show() > # +---+++ > # |key|val1|val2| > # +---+++ > # | a| 1| 1| > # +---+++ > # expected acc1: 2 > # expected acc2: 0 > df = df.filter(my_udf2(col("val2"))) > # df.show() > # +---+++ > # |key|val1|val2| > # +---+++ > # | a| 1| 1| > # +---+++ > # expected acc1: 2 > # expected acc2: 0 > df.show() > print("acc1: %s" % acc1.value) # expected 2, is 2 OK > print("acc2: %s" % acc2.value) # expected 0, is 2 !!! > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22541) Dataframes: applying multiple filters one after another using udfs and accumulators results in faulty accumulators
[ https://issues.apache.org/jira/browse/SPARK-22541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16257004#comment-16257004 ] Eric Maynard commented on SPARK-22541: -- Yeah, this is a common problem when you have side effects in your transformations. If you need to enforce a specific order on your transformations or otherwise split them up (rather than letting spark behind them), you can try putting actions -- like repartitions -- between the transformation functions. > Dataframes: applying multiple filters one after another using udfs and > accumulators results in faulty accumulators > -- > > Key: SPARK-22541 > URL: https://issues.apache.org/jira/browse/SPARK-22541 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.2.0 > Environment: pyspark 2.2.0, ubuntu >Reporter: Janne K. Olesen > > I'm using udf filters and accumulators to keep track of filtered rows in > dataframes. > If I'm applying multiple filters one after the other, they seem to be > executed in parallel, not in sequence, which messes with the accumulators i'm > using to keep track of filtered data. > {code:title=example.py|borderStyle=solid} > from pyspark.sql.functions import udf, col > from pyspark.sql.types import BooleanType > from pyspark.sql import SparkSession > spark = SparkSession.builder.getOrCreate() > sc = spark.sparkContext > df = spark.createDataFrame([("a", 1, 1), ("b", 2, 2), ("c", 3, 3)], ["key", > "val1", "val2"]) > def __myfilter(val, acc): > if val < 2: > return True > else: > acc.add(1) > return False > acc1 = sc.accumulator(0) > acc2 = sc.accumulator(0) > def myfilter1(val): > return __myfilter(val, acc1) > def myfilter2(val): > return __myfilter(val, acc2) > my_udf1 = udf(myfilter1, BooleanType()) > my_udf2 = udf(myfilter2, BooleanType()) > df.show() > # +---+++ > # |key|val1|val2| > # +---+++ > # | a| 1| 1| > # | b| 2| 2| > # | c| 3| 3| > # +---+++ > df = df.filter(my_udf1(col("val1"))) > # df.show() > # +---+++ > # |key|val1|val2| > # +---+++ > # | a| 1| 1| > # +---+++ > # expected acc1: 2 > # expected acc2: 0 > df = df.filter(my_udf2(col("val2"))) > # df.show() > # +---+++ > # |key|val1|val2| > # +---+++ > # | a| 1| 1| > # +---+++ > # expected acc1: 2 > # expected acc2: 0 > df.show() > print("acc1: %s" % acc1.value) # expected 2, is 2 OK > print("acc2: %s" % acc2.value) # expected 0, is 2 !!! > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22541) Dataframes: applying multiple filters one after another using udfs and accumulators results in faulty accumulators
[ https://issues.apache.org/jira/browse/SPARK-22541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256676#comment-16256676 ] Janne K. Olesen commented on SPARK-22541: - I agree, the filtered results are correct, but that is beside the point. It seems like query optimization does something like {noformat} for row in df: result_a = filter1(row) result_b = filter2(row) result = result_a && result_b {noformat} but in my opion it should be {noformat} for row in df: result_a = filter1(row) if result_a == True: return filter2(row) else return False {noformat} If filter2 is executed regardless of the result of filter1, this can lead to strange errors. Considering the following example: {code:title=Example.py} from pyspark.sql.functions import udf, col from pyspark.sql.types import BooleanType from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() df_input = spark.createDataFrame([("a", None), ("b", 2), ("c", 3)], ["key", "val"]) # works as expected df = df_input.filter(col("val").isNotNull()) df = df.filter(col("val") > 2) df.show() # this will raise an error and fail # TypeError: '>' not supported between instances of 'NoneType' and 'int' isNotNone = udf(lambda x: x is not None, BooleanType()) filter2 = udf(lambda x: x > 2, BooleanType()) df = df_input.filter(isNotNone(col("val"))) df = df.filter(filter2(col("val"))) df.show() {code} > Dataframes: applying multiple filters one after another using udfs and > accumulators results in faulty accumulators > -- > > Key: SPARK-22541 > URL: https://issues.apache.org/jira/browse/SPARK-22541 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.2.0 > Environment: pyspark 2.2.0, ubuntu >Reporter: Janne K. Olesen > > I'm using udf filters and accumulators to keep track of filtered rows in > dataframes. > If I'm applying multiple filters one after the other, they seem to be > executed in parallel, not in sequence, which messes with the accumulators i'm > using to keep track of filtered data. > {code:title=example.py|borderStyle=solid} > from pyspark.sql.functions import udf, col > from pyspark.sql.types import BooleanType > from pyspark.sql import SparkSession > spark = SparkSession.builder.getOrCreate() > sc = spark.sparkContext > df = spark.createDataFrame([("a", 1, 1), ("b", 2, 2), ("c", 3, 3)], ["key", > "val1", "val2"]) > def __myfilter(val, acc): > if val < 2: > return True > else: > acc.add(1) > return False > acc1 = sc.accumulator(0) > acc2 = sc.accumulator(0) > def myfilter1(val): > return __myfilter(val, acc1) > def myfilter2(val): > return __myfilter(val, acc2) > my_udf1 = udf(myfilter1, BooleanType()) > my_udf2 = udf(myfilter2, BooleanType()) > df.show() > # +---+++ > # |key|val1|val2| > # +---+++ > # | a| 1| 1| > # | b| 2| 2| > # | c| 3| 3| > # +---+++ > df = df.filter(my_udf1(col("val1"))) > # df.show() > # +---+++ > # |key|val1|val2| > # +---+++ > # | a| 1| 1| > # +---+++ > # expected acc1: 2 > # expected acc2: 0 > df = df.filter(my_udf2(col("val2"))) > # df.show() > # +---+++ > # |key|val1|val2| > # +---+++ > # | a| 1| 1| > # +---+++ > # expected acc1: 2 > # expected acc2: 0 > df.show() > print("acc1: %s" % acc1.value) # expected 2, is 2 OK > print("acc2: %s" % acc2.value) # expected 0, is 2 !!! > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22541) Dataframes: applying multiple filters one after another using udfs and accumulators results in faulty accumulators
[ https://issues.apache.org/jira/browse/SPARK-22541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256441#comment-16256441 ] Liang-Chi Hsieh commented on SPARK-22541: - Due to query optimization, two filters are combined and actually only one filter operation is performed. The filtered results are correct. So I don't think this is a bug. > Dataframes: applying multiple filters one after another using udfs and > accumulators results in faulty accumulators > -- > > Key: SPARK-22541 > URL: https://issues.apache.org/jira/browse/SPARK-22541 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.2.0 > Environment: pyspark 2.2.0, ubuntu >Reporter: Janne K. Olesen > > I'm using udf filters and accumulators to keep track of filtered rows in > dataframes. > If I'm applying multiple filters one after the other, they seem to be > executed in parallel, not in sequence, which messes with the accumulators i'm > using to keep track of filtered data. > {code:title=example.py|borderStyle=solid} > from pyspark.sql.functions import udf, col > from pyspark.sql.types import BooleanType > from pyspark.sql import SparkSession > spark = SparkSession.builder.getOrCreate() > sc = spark.sparkContext > df = spark.createDataFrame([("a", 1, 1), ("b", 2, 2), ("c", 3, 3)], ["key", > "val1", "val2"]) > def __myfilter(val, acc): > if val < 2: > return True > else: > acc.add(1) > return False > acc1 = sc.accumulator(0) > acc2 = sc.accumulator(0) > def myfilter1(val): > return __myfilter(val, acc1) > def myfilter2(val): > return __myfilter(val, acc2) > my_udf1 = udf(myfilter1, BooleanType()) > my_udf2 = udf(myfilter2, BooleanType()) > df.show() > # +---+++ > # |key|val1|val2| > # +---+++ > # | a| 1| 1| > # | b| 2| 2| > # | c| 3| 3| > # +---+++ > df = df.filter(my_udf1(col("val1"))) > # df.show() > # +---+++ > # |key|val1|val2| > # +---+++ > # | a| 1| 1| > # +---+++ > # expected acc1: 2 > # expected acc2: 0 > df = df.filter(my_udf2(col("val2"))) > # df.show() > # +---+++ > # |key|val1|val2| > # +---+++ > # | a| 1| 1| > # +---+++ > # expected acc1: 2 > # expected acc2: 0 > df.show() > print("acc1: %s" % acc1.value) # expected 2, is 2 OK > print("acc2: %s" % acc2.value) # expected 0, is 2 !!! > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org