[jira] [Commented] (SPARK-25591) PySpark Accumulators with multiple PythonUDFs
[ https://issues.apache.org/jira/browse/SPARK-25591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16732574#comment-16732574 ] Hyukjin Kwon commented on SPARK-25591: -- +1 > PySpark Accumulators with multiple PythonUDFs > - > > Key: SPARK-25591 > URL: https://issues.apache.org/jira/browse/SPARK-25591 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.2 >Reporter: Abdeali Kothari >Assignee: Liang-Chi Hsieh >Priority: Blocker > Labels: correctness > Fix For: 2.4.0 > > > When having multiple Python UDFs - the last Python UDF's accumulator is the > only accumulator that gets updated. > {code:python} > import pyspark > from pyspark.sql import SparkSession, Row > from pyspark.sql import functions as F > from pyspark.sql import types as T > from pyspark import AccumulatorParam > spark = SparkSession.builder.getOrCreate() > spark.sparkContext.setLogLevel("ERROR") > test_accum = spark.sparkContext.accumulator(0.0) > SHUFFLE = False > def main(data): > print(">>> Check0", test_accum.value) > def test(x): > global test_accum > test_accum += 1.0 > return x > print(">>> Check1", test_accum.value) > def test2(x): > global test_accum > test_accum += 100.0 > return x > print(">>> Check2", test_accum.value) > func_udf = F.udf(test, T.DoubleType()) > print(">>> Check3", test_accum.value) > func_udf2 = F.udf(test2, T.DoubleType()) > print(">>> Check4", test_accum.value) > data = data.withColumn("out1", func_udf(data["a"])) > if SHUFFLE: > data = data.repartition(2) > print(">>> Check5", test_accum.value) > data = data.withColumn("out2", func_udf2(data["b"])) > if SHUFFLE: > data = data.repartition(2) > print(">>> Check6", test_accum.value) > data.show() # ACTION > print(">>> Check7", test_accum.value) > return data > df = spark.createDataFrame([ > [1.0, 2.0] > ], schema=T.StructType([T.StructField(field_name, T.DoubleType(), True) for > field_name in ["a", "b"]])) > df2 = main(df) > {code} > {code:python} > Output 1 - with SHUFFLE=False > ... > # >>> Check7 100.0 > Output 2 - with SHUFFLE=True > ... > # >>> Check7 101.0 > {code} > Basically looks like: > - Accumulator works only for last UDF before a shuffle-like operation -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25591) PySpark Accumulators with multiple PythonUDFs
[ https://issues.apache.org/jira/browse/SPARK-25591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16732573#comment-16732573 ] Dongjoon Hyun commented on SPARK-25591: --- Thank you for confirming, [~viirya]. Yes. Please make two PRs for them. > PySpark Accumulators with multiple PythonUDFs > - > > Key: SPARK-25591 > URL: https://issues.apache.org/jira/browse/SPARK-25591 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.2 >Reporter: Abdeali Kothari >Assignee: Liang-Chi Hsieh >Priority: Blocker > Labels: correctness > Fix For: 2.4.0 > > > When having multiple Python UDFs - the last Python UDF's accumulator is the > only accumulator that gets updated. > {code:python} > import pyspark > from pyspark.sql import SparkSession, Row > from pyspark.sql import functions as F > from pyspark.sql import types as T > from pyspark import AccumulatorParam > spark = SparkSession.builder.getOrCreate() > spark.sparkContext.setLogLevel("ERROR") > test_accum = spark.sparkContext.accumulator(0.0) > SHUFFLE = False > def main(data): > print(">>> Check0", test_accum.value) > def test(x): > global test_accum > test_accum += 1.0 > return x > print(">>> Check1", test_accum.value) > def test2(x): > global test_accum > test_accum += 100.0 > return x > print(">>> Check2", test_accum.value) > func_udf = F.udf(test, T.DoubleType()) > print(">>> Check3", test_accum.value) > func_udf2 = F.udf(test2, T.DoubleType()) > print(">>> Check4", test_accum.value) > data = data.withColumn("out1", func_udf(data["a"])) > if SHUFFLE: > data = data.repartition(2) > print(">>> Check5", test_accum.value) > data = data.withColumn("out2", func_udf2(data["b"])) > if SHUFFLE: > data = data.repartition(2) > print(">>> Check6", test_accum.value) > data.show() # ACTION > print(">>> Check7", test_accum.value) > return data > df = spark.createDataFrame([ > [1.0, 2.0] > ], schema=T.StructType([T.StructField(field_name, T.DoubleType(), True) for > field_name in ["a", "b"]])) > df2 = main(df) > {code} > {code:python} > Output 1 - with SHUFFLE=False > ... > # >>> Check7 100.0 > Output 2 - with SHUFFLE=True > ... > # >>> Check7 101.0 > {code} > Basically looks like: > - Accumulator works only for last UDF before a shuffle-like operation -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25591) PySpark Accumulators with multiple PythonUDFs
[ https://issues.apache.org/jira/browse/SPARK-25591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16732569#comment-16732569 ] Liang-Chi Hsieh commented on SPARK-25591: - I can make backport PRs if you need. [~dongjoon] > PySpark Accumulators with multiple PythonUDFs > - > > Key: SPARK-25591 > URL: https://issues.apache.org/jira/browse/SPARK-25591 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.2 >Reporter: Abdeali Kothari >Assignee: Liang-Chi Hsieh >Priority: Blocker > Labels: correctness > Fix For: 2.4.0 > > > When having multiple Python UDFs - the last Python UDF's accumulator is the > only accumulator that gets updated. > {code:python} > import pyspark > from pyspark.sql import SparkSession, Row > from pyspark.sql import functions as F > from pyspark.sql import types as T > from pyspark import AccumulatorParam > spark = SparkSession.builder.getOrCreate() > spark.sparkContext.setLogLevel("ERROR") > test_accum = spark.sparkContext.accumulator(0.0) > SHUFFLE = False > def main(data): > print(">>> Check0", test_accum.value) > def test(x): > global test_accum > test_accum += 1.0 > return x > print(">>> Check1", test_accum.value) > def test2(x): > global test_accum > test_accum += 100.0 > return x > print(">>> Check2", test_accum.value) > func_udf = F.udf(test, T.DoubleType()) > print(">>> Check3", test_accum.value) > func_udf2 = F.udf(test2, T.DoubleType()) > print(">>> Check4", test_accum.value) > data = data.withColumn("out1", func_udf(data["a"])) > if SHUFFLE: > data = data.repartition(2) > print(">>> Check5", test_accum.value) > data = data.withColumn("out2", func_udf2(data["b"])) > if SHUFFLE: > data = data.repartition(2) > print(">>> Check6", test_accum.value) > data.show() # ACTION > print(">>> Check7", test_accum.value) > return data > df = spark.createDataFrame([ > [1.0, 2.0] > ], schema=T.StructType([T.StructField(field_name, T.DoubleType(), True) for > field_name in ["a", "b"]])) > df2 = main(df) > {code} > {code:python} > Output 1 - with SHUFFLE=False > ... > # >>> Check7 100.0 > Output 2 - with SHUFFLE=True > ... > # >>> Check7 101.0 > {code} > Basically looks like: > - Accumulator works only for last UDF before a shuffle-like operation -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25591) PySpark Accumulators with multiple PythonUDFs
[ https://issues.apache.org/jira/browse/SPARK-25591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16732567#comment-16732567 ] Liang-Chi Hsieh commented on SPARK-25591: - This is bug fixing, so I think it makes sense to backport this to branch-2.3 and 2.2 if needed. > PySpark Accumulators with multiple PythonUDFs > - > > Key: SPARK-25591 > URL: https://issues.apache.org/jira/browse/SPARK-25591 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.2 >Reporter: Abdeali Kothari >Assignee: Liang-Chi Hsieh >Priority: Blocker > Labels: correctness > Fix For: 2.4.0 > > > When having multiple Python UDFs - the last Python UDF's accumulator is the > only accumulator that gets updated. > {code:python} > import pyspark > from pyspark.sql import SparkSession, Row > from pyspark.sql import functions as F > from pyspark.sql import types as T > from pyspark import AccumulatorParam > spark = SparkSession.builder.getOrCreate() > spark.sparkContext.setLogLevel("ERROR") > test_accum = spark.sparkContext.accumulator(0.0) > SHUFFLE = False > def main(data): > print(">>> Check0", test_accum.value) > def test(x): > global test_accum > test_accum += 1.0 > return x > print(">>> Check1", test_accum.value) > def test2(x): > global test_accum > test_accum += 100.0 > return x > print(">>> Check2", test_accum.value) > func_udf = F.udf(test, T.DoubleType()) > print(">>> Check3", test_accum.value) > func_udf2 = F.udf(test2, T.DoubleType()) > print(">>> Check4", test_accum.value) > data = data.withColumn("out1", func_udf(data["a"])) > if SHUFFLE: > data = data.repartition(2) > print(">>> Check5", test_accum.value) > data = data.withColumn("out2", func_udf2(data["b"])) > if SHUFFLE: > data = data.repartition(2) > print(">>> Check6", test_accum.value) > data.show() # ACTION > print(">>> Check7", test_accum.value) > return data > df = spark.createDataFrame([ > [1.0, 2.0] > ], schema=T.StructType([T.StructField(field_name, T.DoubleType(), True) for > field_name in ["a", "b"]])) > df2 = main(df) > {code} > {code:python} > Output 1 - with SHUFFLE=False > ... > # >>> Check7 100.0 > Output 2 - with SHUFFLE=True > ... > # >>> Check7 101.0 > {code} > Basically looks like: > - Accumulator works only for last UDF before a shuffle-like operation -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25591) PySpark Accumulators with multiple PythonUDFs
[ https://issues.apache.org/jira/browse/SPARK-25591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16732556#comment-16732556 ] Dongjoon Hyun commented on SPARK-25591: --- Hi, [~viirya], [~hyukjin.kwon]. This is only in branch-2.4. Can we backport this to older branches like branch-2.3 and branch-2.2? cc [~AbdealiJK] > PySpark Accumulators with multiple PythonUDFs > - > > Key: SPARK-25591 > URL: https://issues.apache.org/jira/browse/SPARK-25591 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.2 >Reporter: Abdeali Kothari >Assignee: Liang-Chi Hsieh >Priority: Blocker > Labels: correctness > Fix For: 2.4.0 > > > When having multiple Python UDFs - the last Python UDF's accumulator is the > only accumulator that gets updated. > {code:python} > import pyspark > from pyspark.sql import SparkSession, Row > from pyspark.sql import functions as F > from pyspark.sql import types as T > from pyspark import AccumulatorParam > spark = SparkSession.builder.getOrCreate() > spark.sparkContext.setLogLevel("ERROR") > test_accum = spark.sparkContext.accumulator(0.0) > SHUFFLE = False > def main(data): > print(">>> Check0", test_accum.value) > def test(x): > global test_accum > test_accum += 1.0 > return x > print(">>> Check1", test_accum.value) > def test2(x): > global test_accum > test_accum += 100.0 > return x > print(">>> Check2", test_accum.value) > func_udf = F.udf(test, T.DoubleType()) > print(">>> Check3", test_accum.value) > func_udf2 = F.udf(test2, T.DoubleType()) > print(">>> Check4", test_accum.value) > data = data.withColumn("out1", func_udf(data["a"])) > if SHUFFLE: > data = data.repartition(2) > print(">>> Check5", test_accum.value) > data = data.withColumn("out2", func_udf2(data["b"])) > if SHUFFLE: > data = data.repartition(2) > print(">>> Check6", test_accum.value) > data.show() # ACTION > print(">>> Check7", test_accum.value) > return data > df = spark.createDataFrame([ > [1.0, 2.0] > ], schema=T.StructType([T.StructField(field_name, T.DoubleType(), True) for > field_name in ["a", "b"]])) > df2 = main(df) > {code} > {code:python} > Output 1 - with SHUFFLE=False > ... > # >>> Check7 100.0 > Output 2 - with SHUFFLE=True > ... > # >>> Check7 101.0 > {code} > Basically looks like: > - Accumulator works only for last UDF before a shuffle-like operation -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25591) PySpark Accumulators with multiple PythonUDFs
[ https://issues.apache.org/jira/browse/SPARK-25591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642506#comment-16642506 ] Xiao Li commented on SPARK-25591: - RC3 is not out yet. Thus, RC3 will include the fix. > PySpark Accumulators with multiple PythonUDFs > - > > Key: SPARK-25591 > URL: https://issues.apache.org/jira/browse/SPARK-25591 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.2 >Reporter: Abdeali Kothari >Assignee: Liang-Chi Hsieh >Priority: Blocker > Labels: correctness > Fix For: 2.4.0 > > > When having multiple Python UDFs - the last Python UDF's accumulator is the > only accumulator that gets updated. > {code:python} > import pyspark > from pyspark.sql import SparkSession, Row > from pyspark.sql import functions as F > from pyspark.sql import types as T > from pyspark import AccumulatorParam > spark = SparkSession.builder.getOrCreate() > spark.sparkContext.setLogLevel("ERROR") > test_accum = spark.sparkContext.accumulator(0.0) > SHUFFLE = False > def main(data): > print(">>> Check0", test_accum.value) > def test(x): > global test_accum > test_accum += 1.0 > return x > print(">>> Check1", test_accum.value) > def test2(x): > global test_accum > test_accum += 100.0 > return x > print(">>> Check2", test_accum.value) > func_udf = F.udf(test, T.DoubleType()) > print(">>> Check3", test_accum.value) > func_udf2 = F.udf(test2, T.DoubleType()) > print(">>> Check4", test_accum.value) > data = data.withColumn("out1", func_udf(data["a"])) > if SHUFFLE: > data = data.repartition(2) > print(">>> Check5", test_accum.value) > data = data.withColumn("out2", func_udf2(data["b"])) > if SHUFFLE: > data = data.repartition(2) > print(">>> Check6", test_accum.value) > data.show() # ACTION > print(">>> Check7", test_accum.value) > return data > df = spark.createDataFrame([ > [1.0, 2.0] > ], schema=T.StructType([T.StructField(field_name, T.DoubleType(), True) for > field_name in ["a", "b"]])) > df2 = main(df) > {code} > {code:python} > Output 1 - with SHUFFLE=False > ... > # >>> Check7 100.0 > Output 2 - with SHUFFLE=True > ... > # >>> Check7 101.0 > {code} > Basically looks like: > - Accumulator works only for last UDF before a shuffle-like operation -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25591) PySpark Accumulators with multiple PythonUDFs
[ https://issues.apache.org/jira/browse/SPARK-25591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16639059#comment-16639059 ] Apache Spark commented on SPARK-25591: -- User 'viirya' has created a pull request for this issue: https://github.com/apache/spark/pull/22635 > PySpark Accumulators with multiple PythonUDFs > - > > Key: SPARK-25591 > URL: https://issues.apache.org/jira/browse/SPARK-25591 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.2 >Reporter: Abdeali Kothari >Priority: Major > > When having multiple Python UDFs - the last Python UDF's accumulator is the > only accumulator that gets updated. > {code:python} > import pyspark > from pyspark.sql import SparkSession, Row > from pyspark.sql import functions as F > from pyspark.sql import types as T > from pyspark import AccumulatorParam > spark = SparkSession.builder.getOrCreate() > spark.sparkContext.setLogLevel("ERROR") > test_accum = spark.sparkContext.accumulator(0.0) > SHUFFLE = False > def main(data): > print(">>> Check0", test_accum.value) > def test(x): > global test_accum > test_accum += 1.0 > return x > print(">>> Check1", test_accum.value) > def test2(x): > global test_accum > test_accum += 100.0 > return x > print(">>> Check2", test_accum.value) > func_udf = F.udf(test, T.DoubleType()) > print(">>> Check3", test_accum.value) > func_udf2 = F.udf(test2, T.DoubleType()) > print(">>> Check4", test_accum.value) > data = data.withColumn("out1", func_udf(data["a"])) > if SHUFFLE: > data = data.repartition(2) > print(">>> Check5", test_accum.value) > data = data.withColumn("out2", func_udf2(data["b"])) > if SHUFFLE: > data = data.repartition(2) > print(">>> Check6", test_accum.value) > data.show() # ACTION > print(">>> Check7", test_accum.value) > return data > df = spark.createDataFrame([ > [1.0, 2.0] > ], schema=T.StructType([T.StructField(field_name, T.DoubleType(), True) for > field_name in ["a", "b"]])) > df2 = main(df) > {code} > Output 1 - with SHUFFLE=False > ... > # >>> Check7 100.0 > Output 2 - with SHUFFLE=True > ... > # >>> Check7 101.0 > Basically looks like: > - Accumulator works only for last UDF before a shuffle-like operation -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25591) PySpark Accumulators with multiple PythonUDFs
[ https://issues.apache.org/jira/browse/SPARK-25591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16639061#comment-16639061 ] Apache Spark commented on SPARK-25591: -- User 'viirya' has created a pull request for this issue: https://github.com/apache/spark/pull/22635 > PySpark Accumulators with multiple PythonUDFs > - > > Key: SPARK-25591 > URL: https://issues.apache.org/jira/browse/SPARK-25591 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.2 >Reporter: Abdeali Kothari >Priority: Major > > When having multiple Python UDFs - the last Python UDF's accumulator is the > only accumulator that gets updated. > {code:python} > import pyspark > from pyspark.sql import SparkSession, Row > from pyspark.sql import functions as F > from pyspark.sql import types as T > from pyspark import AccumulatorParam > spark = SparkSession.builder.getOrCreate() > spark.sparkContext.setLogLevel("ERROR") > test_accum = spark.sparkContext.accumulator(0.0) > SHUFFLE = False > def main(data): > print(">>> Check0", test_accum.value) > def test(x): > global test_accum > test_accum += 1.0 > return x > print(">>> Check1", test_accum.value) > def test2(x): > global test_accum > test_accum += 100.0 > return x > print(">>> Check2", test_accum.value) > func_udf = F.udf(test, T.DoubleType()) > print(">>> Check3", test_accum.value) > func_udf2 = F.udf(test2, T.DoubleType()) > print(">>> Check4", test_accum.value) > data = data.withColumn("out1", func_udf(data["a"])) > if SHUFFLE: > data = data.repartition(2) > print(">>> Check5", test_accum.value) > data = data.withColumn("out2", func_udf2(data["b"])) > if SHUFFLE: > data = data.repartition(2) > print(">>> Check6", test_accum.value) > data.show() # ACTION > print(">>> Check7", test_accum.value) > return data > df = spark.createDataFrame([ > [1.0, 2.0] > ], schema=T.StructType([T.StructField(field_name, T.DoubleType(), True) for > field_name in ["a", "b"]])) > df2 = main(df) > {code} > Output 1 - with SHUFFLE=False > ... > # >>> Check7 100.0 > Output 2 - with SHUFFLE=True > ... > # >>> Check7 101.0 > Basically looks like: > - Accumulator works only for last UDF before a shuffle-like operation -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org