[jira] [Commented] (SPARK-25591) PySpark Accumulators with multiple PythonUDFs

2019-01-02 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16732574#comment-16732574
 ] 

Hyukjin Kwon commented on SPARK-25591:
--

+1

> PySpark Accumulators with multiple PythonUDFs
> -
>
> Key: SPARK-25591
> URL: https://issues.apache.org/jira/browse/SPARK-25591
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.2
>Reporter: Abdeali Kothari
>Assignee: Liang-Chi Hsieh
>Priority: Blocker
>  Labels: correctness
> Fix For: 2.4.0
>
>
> When having multiple Python UDFs - the last Python UDF's accumulator is the 
> only accumulator that gets updated.
> {code:python}
> import pyspark
> from pyspark.sql import SparkSession, Row
> from pyspark.sql import functions as F
> from pyspark.sql import types as T
> from pyspark import AccumulatorParam
> spark = SparkSession.builder.getOrCreate()
> spark.sparkContext.setLogLevel("ERROR")
> test_accum = spark.sparkContext.accumulator(0.0)
> SHUFFLE = False
> def main(data):
> print(">>> Check0", test_accum.value)
> def test(x):
> global test_accum
> test_accum += 1.0
> return x
> print(">>> Check1", test_accum.value)
> def test2(x):
> global test_accum
> test_accum += 100.0
> return x
> print(">>> Check2", test_accum.value)
> func_udf = F.udf(test, T.DoubleType())
> print(">>> Check3", test_accum.value)
> func_udf2 = F.udf(test2, T.DoubleType())
> print(">>> Check4", test_accum.value)
> data = data.withColumn("out1", func_udf(data["a"]))
> if SHUFFLE:
> data = data.repartition(2)
> print(">>> Check5", test_accum.value)
> data = data.withColumn("out2", func_udf2(data["b"]))
> if SHUFFLE:
> data = data.repartition(2)
> print(">>> Check6", test_accum.value)
> data.show()  # ACTION
> print(">>> Check7", test_accum.value)
> return data
> df = spark.createDataFrame([
> [1.0, 2.0]
> ], schema=T.StructType([T.StructField(field_name, T.DoubleType(), True) for 
> field_name in ["a", "b"]]))
> df2 = main(df)
> {code}
> {code:python}
>  Output 1 - with SHUFFLE=False
> ...
> # >>> Check7 100.0
>  Output 2 - with SHUFFLE=True
> ...
> # >>> Check7 101.0
> {code}
> Basically looks like:
>  - Accumulator works only for last UDF before a shuffle-like operation



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25591) PySpark Accumulators with multiple PythonUDFs

2019-01-02 Thread Dongjoon Hyun (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16732573#comment-16732573
 ] 

Dongjoon Hyun commented on SPARK-25591:
---

Thank you for confirming, [~viirya]. Yes. Please make two PRs for them.

> PySpark Accumulators with multiple PythonUDFs
> -
>
> Key: SPARK-25591
> URL: https://issues.apache.org/jira/browse/SPARK-25591
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.2
>Reporter: Abdeali Kothari
>Assignee: Liang-Chi Hsieh
>Priority: Blocker
>  Labels: correctness
> Fix For: 2.4.0
>
>
> When having multiple Python UDFs - the last Python UDF's accumulator is the 
> only accumulator that gets updated.
> {code:python}
> import pyspark
> from pyspark.sql import SparkSession, Row
> from pyspark.sql import functions as F
> from pyspark.sql import types as T
> from pyspark import AccumulatorParam
> spark = SparkSession.builder.getOrCreate()
> spark.sparkContext.setLogLevel("ERROR")
> test_accum = spark.sparkContext.accumulator(0.0)
> SHUFFLE = False
> def main(data):
> print(">>> Check0", test_accum.value)
> def test(x):
> global test_accum
> test_accum += 1.0
> return x
> print(">>> Check1", test_accum.value)
> def test2(x):
> global test_accum
> test_accum += 100.0
> return x
> print(">>> Check2", test_accum.value)
> func_udf = F.udf(test, T.DoubleType())
> print(">>> Check3", test_accum.value)
> func_udf2 = F.udf(test2, T.DoubleType())
> print(">>> Check4", test_accum.value)
> data = data.withColumn("out1", func_udf(data["a"]))
> if SHUFFLE:
> data = data.repartition(2)
> print(">>> Check5", test_accum.value)
> data = data.withColumn("out2", func_udf2(data["b"]))
> if SHUFFLE:
> data = data.repartition(2)
> print(">>> Check6", test_accum.value)
> data.show()  # ACTION
> print(">>> Check7", test_accum.value)
> return data
> df = spark.createDataFrame([
> [1.0, 2.0]
> ], schema=T.StructType([T.StructField(field_name, T.DoubleType(), True) for 
> field_name in ["a", "b"]]))
> df2 = main(df)
> {code}
> {code:python}
>  Output 1 - with SHUFFLE=False
> ...
> # >>> Check7 100.0
>  Output 2 - with SHUFFLE=True
> ...
> # >>> Check7 101.0
> {code}
> Basically looks like:
>  - Accumulator works only for last UDF before a shuffle-like operation



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25591) PySpark Accumulators with multiple PythonUDFs

2019-01-02 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16732569#comment-16732569
 ] 

Liang-Chi Hsieh commented on SPARK-25591:
-

I can make backport PRs if you need. [~dongjoon]

> PySpark Accumulators with multiple PythonUDFs
> -
>
> Key: SPARK-25591
> URL: https://issues.apache.org/jira/browse/SPARK-25591
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.2
>Reporter: Abdeali Kothari
>Assignee: Liang-Chi Hsieh
>Priority: Blocker
>  Labels: correctness
> Fix For: 2.4.0
>
>
> When having multiple Python UDFs - the last Python UDF's accumulator is the 
> only accumulator that gets updated.
> {code:python}
> import pyspark
> from pyspark.sql import SparkSession, Row
> from pyspark.sql import functions as F
> from pyspark.sql import types as T
> from pyspark import AccumulatorParam
> spark = SparkSession.builder.getOrCreate()
> spark.sparkContext.setLogLevel("ERROR")
> test_accum = spark.sparkContext.accumulator(0.0)
> SHUFFLE = False
> def main(data):
> print(">>> Check0", test_accum.value)
> def test(x):
> global test_accum
> test_accum += 1.0
> return x
> print(">>> Check1", test_accum.value)
> def test2(x):
> global test_accum
> test_accum += 100.0
> return x
> print(">>> Check2", test_accum.value)
> func_udf = F.udf(test, T.DoubleType())
> print(">>> Check3", test_accum.value)
> func_udf2 = F.udf(test2, T.DoubleType())
> print(">>> Check4", test_accum.value)
> data = data.withColumn("out1", func_udf(data["a"]))
> if SHUFFLE:
> data = data.repartition(2)
> print(">>> Check5", test_accum.value)
> data = data.withColumn("out2", func_udf2(data["b"]))
> if SHUFFLE:
> data = data.repartition(2)
> print(">>> Check6", test_accum.value)
> data.show()  # ACTION
> print(">>> Check7", test_accum.value)
> return data
> df = spark.createDataFrame([
> [1.0, 2.0]
> ], schema=T.StructType([T.StructField(field_name, T.DoubleType(), True) for 
> field_name in ["a", "b"]]))
> df2 = main(df)
> {code}
> {code:python}
>  Output 1 - with SHUFFLE=False
> ...
> # >>> Check7 100.0
>  Output 2 - with SHUFFLE=True
> ...
> # >>> Check7 101.0
> {code}
> Basically looks like:
>  - Accumulator works only for last UDF before a shuffle-like operation



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25591) PySpark Accumulators with multiple PythonUDFs

2019-01-02 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16732567#comment-16732567
 ] 

Liang-Chi Hsieh commented on SPARK-25591:
-

This is bug fixing, so I think it makes sense to backport this to branch-2.3 
and 2.2 if needed.

> PySpark Accumulators with multiple PythonUDFs
> -
>
> Key: SPARK-25591
> URL: https://issues.apache.org/jira/browse/SPARK-25591
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.2
>Reporter: Abdeali Kothari
>Assignee: Liang-Chi Hsieh
>Priority: Blocker
>  Labels: correctness
> Fix For: 2.4.0
>
>
> When having multiple Python UDFs - the last Python UDF's accumulator is the 
> only accumulator that gets updated.
> {code:python}
> import pyspark
> from pyspark.sql import SparkSession, Row
> from pyspark.sql import functions as F
> from pyspark.sql import types as T
> from pyspark import AccumulatorParam
> spark = SparkSession.builder.getOrCreate()
> spark.sparkContext.setLogLevel("ERROR")
> test_accum = spark.sparkContext.accumulator(0.0)
> SHUFFLE = False
> def main(data):
> print(">>> Check0", test_accum.value)
> def test(x):
> global test_accum
> test_accum += 1.0
> return x
> print(">>> Check1", test_accum.value)
> def test2(x):
> global test_accum
> test_accum += 100.0
> return x
> print(">>> Check2", test_accum.value)
> func_udf = F.udf(test, T.DoubleType())
> print(">>> Check3", test_accum.value)
> func_udf2 = F.udf(test2, T.DoubleType())
> print(">>> Check4", test_accum.value)
> data = data.withColumn("out1", func_udf(data["a"]))
> if SHUFFLE:
> data = data.repartition(2)
> print(">>> Check5", test_accum.value)
> data = data.withColumn("out2", func_udf2(data["b"]))
> if SHUFFLE:
> data = data.repartition(2)
> print(">>> Check6", test_accum.value)
> data.show()  # ACTION
> print(">>> Check7", test_accum.value)
> return data
> df = spark.createDataFrame([
> [1.0, 2.0]
> ], schema=T.StructType([T.StructField(field_name, T.DoubleType(), True) for 
> field_name in ["a", "b"]]))
> df2 = main(df)
> {code}
> {code:python}
>  Output 1 - with SHUFFLE=False
> ...
> # >>> Check7 100.0
>  Output 2 - with SHUFFLE=True
> ...
> # >>> Check7 101.0
> {code}
> Basically looks like:
>  - Accumulator works only for last UDF before a shuffle-like operation



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25591) PySpark Accumulators with multiple PythonUDFs

2019-01-02 Thread Dongjoon Hyun (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16732556#comment-16732556
 ] 

Dongjoon Hyun commented on SPARK-25591:
---

Hi, [~viirya], [~hyukjin.kwon].

This is only in branch-2.4. Can we backport this to older branches like 
branch-2.3 and branch-2.2?

cc [~AbdealiJK]

> PySpark Accumulators with multiple PythonUDFs
> -
>
> Key: SPARK-25591
> URL: https://issues.apache.org/jira/browse/SPARK-25591
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.2
>Reporter: Abdeali Kothari
>Assignee: Liang-Chi Hsieh
>Priority: Blocker
>  Labels: correctness
> Fix For: 2.4.0
>
>
> When having multiple Python UDFs - the last Python UDF's accumulator is the 
> only accumulator that gets updated.
> {code:python}
> import pyspark
> from pyspark.sql import SparkSession, Row
> from pyspark.sql import functions as F
> from pyspark.sql import types as T
> from pyspark import AccumulatorParam
> spark = SparkSession.builder.getOrCreate()
> spark.sparkContext.setLogLevel("ERROR")
> test_accum = spark.sparkContext.accumulator(0.0)
> SHUFFLE = False
> def main(data):
> print(">>> Check0", test_accum.value)
> def test(x):
> global test_accum
> test_accum += 1.0
> return x
> print(">>> Check1", test_accum.value)
> def test2(x):
> global test_accum
> test_accum += 100.0
> return x
> print(">>> Check2", test_accum.value)
> func_udf = F.udf(test, T.DoubleType())
> print(">>> Check3", test_accum.value)
> func_udf2 = F.udf(test2, T.DoubleType())
> print(">>> Check4", test_accum.value)
> data = data.withColumn("out1", func_udf(data["a"]))
> if SHUFFLE:
> data = data.repartition(2)
> print(">>> Check5", test_accum.value)
> data = data.withColumn("out2", func_udf2(data["b"]))
> if SHUFFLE:
> data = data.repartition(2)
> print(">>> Check6", test_accum.value)
> data.show()  # ACTION
> print(">>> Check7", test_accum.value)
> return data
> df = spark.createDataFrame([
> [1.0, 2.0]
> ], schema=T.StructType([T.StructField(field_name, T.DoubleType(), True) for 
> field_name in ["a", "b"]]))
> df2 = main(df)
> {code}
> {code:python}
>  Output 1 - with SHUFFLE=False
> ...
> # >>> Check7 100.0
>  Output 2 - with SHUFFLE=True
> ...
> # >>> Check7 101.0
> {code}
> Basically looks like:
>  - Accumulator works only for last UDF before a shuffle-like operation



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25591) PySpark Accumulators with multiple PythonUDFs

2018-10-08 Thread Xiao Li (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642506#comment-16642506
 ] 

Xiao Li commented on SPARK-25591:
-

RC3 is not out yet. Thus, RC3 will include the fix.

> PySpark Accumulators with multiple PythonUDFs
> -
>
> Key: SPARK-25591
> URL: https://issues.apache.org/jira/browse/SPARK-25591
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.2
>Reporter: Abdeali Kothari
>Assignee: Liang-Chi Hsieh
>Priority: Blocker
>  Labels: correctness
> Fix For: 2.4.0
>
>
> When having multiple Python UDFs - the last Python UDF's accumulator is the 
> only accumulator that gets updated.
> {code:python}
> import pyspark
> from pyspark.sql import SparkSession, Row
> from pyspark.sql import functions as F
> from pyspark.sql import types as T
> from pyspark import AccumulatorParam
> spark = SparkSession.builder.getOrCreate()
> spark.sparkContext.setLogLevel("ERROR")
> test_accum = spark.sparkContext.accumulator(0.0)
> SHUFFLE = False
> def main(data):
> print(">>> Check0", test_accum.value)
> def test(x):
> global test_accum
> test_accum += 1.0
> return x
> print(">>> Check1", test_accum.value)
> def test2(x):
> global test_accum
> test_accum += 100.0
> return x
> print(">>> Check2", test_accum.value)
> func_udf = F.udf(test, T.DoubleType())
> print(">>> Check3", test_accum.value)
> func_udf2 = F.udf(test2, T.DoubleType())
> print(">>> Check4", test_accum.value)
> data = data.withColumn("out1", func_udf(data["a"]))
> if SHUFFLE:
> data = data.repartition(2)
> print(">>> Check5", test_accum.value)
> data = data.withColumn("out2", func_udf2(data["b"]))
> if SHUFFLE:
> data = data.repartition(2)
> print(">>> Check6", test_accum.value)
> data.show()  # ACTION
> print(">>> Check7", test_accum.value)
> return data
> df = spark.createDataFrame([
> [1.0, 2.0]
> ], schema=T.StructType([T.StructField(field_name, T.DoubleType(), True) for 
> field_name in ["a", "b"]]))
> df2 = main(df)
> {code}
> {code:python}
>  Output 1 - with SHUFFLE=False
> ...
> # >>> Check7 100.0
>  Output 2 - with SHUFFLE=True
> ...
> # >>> Check7 101.0
> {code}
> Basically looks like:
>  - Accumulator works only for last UDF before a shuffle-like operation



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25591) PySpark Accumulators with multiple PythonUDFs

2018-10-04 Thread Apache Spark (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16639059#comment-16639059
 ] 

Apache Spark commented on SPARK-25591:
--

User 'viirya' has created a pull request for this issue:
https://github.com/apache/spark/pull/22635

> PySpark Accumulators with multiple PythonUDFs
> -
>
> Key: SPARK-25591
> URL: https://issues.apache.org/jira/browse/SPARK-25591
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.2
>Reporter: Abdeali Kothari
>Priority: Major
>
> When having multiple Python UDFs - the last Python UDF's accumulator is the 
> only accumulator that gets updated.
> {code:python}
> import pyspark
> from pyspark.sql import SparkSession, Row
> from pyspark.sql import functions as F
> from pyspark.sql import types as T
> from pyspark import AccumulatorParam
> spark = SparkSession.builder.getOrCreate()
> spark.sparkContext.setLogLevel("ERROR")
> test_accum = spark.sparkContext.accumulator(0.0)
> SHUFFLE = False
> def main(data):
> print(">>> Check0", test_accum.value)
> def test(x):
> global test_accum
> test_accum += 1.0
> return x
> print(">>> Check1", test_accum.value)
> def test2(x):
> global test_accum
> test_accum += 100.0
> return x
> print(">>> Check2", test_accum.value)
> func_udf = F.udf(test, T.DoubleType())
> print(">>> Check3", test_accum.value)
> func_udf2 = F.udf(test2, T.DoubleType())
> print(">>> Check4", test_accum.value)
> data = data.withColumn("out1", func_udf(data["a"]))
> if SHUFFLE:
> data = data.repartition(2)
> print(">>> Check5", test_accum.value)
> data = data.withColumn("out2", func_udf2(data["b"]))
> if SHUFFLE:
> data = data.repartition(2)
> print(">>> Check6", test_accum.value)
> data.show()  # ACTION
> print(">>> Check7", test_accum.value)
> return data
> df = spark.createDataFrame([
> [1.0, 2.0]
> ], schema=T.StructType([T.StructField(field_name, T.DoubleType(), True) for 
> field_name in ["a", "b"]]))
> df2 = main(df)
> {code}
>  Output 1 - with SHUFFLE=False
> ...
> # >>> Check7 100.0
>  Output 2 - with SHUFFLE=True
> ...
> # >>> Check7 101.0
> Basically looks like:
>  - Accumulator works only for last UDF before a shuffle-like operation



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25591) PySpark Accumulators with multiple PythonUDFs

2018-10-04 Thread Apache Spark (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16639061#comment-16639061
 ] 

Apache Spark commented on SPARK-25591:
--

User 'viirya' has created a pull request for this issue:
https://github.com/apache/spark/pull/22635

> PySpark Accumulators with multiple PythonUDFs
> -
>
> Key: SPARK-25591
> URL: https://issues.apache.org/jira/browse/SPARK-25591
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.2
>Reporter: Abdeali Kothari
>Priority: Major
>
> When having multiple Python UDFs - the last Python UDF's accumulator is the 
> only accumulator that gets updated.
> {code:python}
> import pyspark
> from pyspark.sql import SparkSession, Row
> from pyspark.sql import functions as F
> from pyspark.sql import types as T
> from pyspark import AccumulatorParam
> spark = SparkSession.builder.getOrCreate()
> spark.sparkContext.setLogLevel("ERROR")
> test_accum = spark.sparkContext.accumulator(0.0)
> SHUFFLE = False
> def main(data):
> print(">>> Check0", test_accum.value)
> def test(x):
> global test_accum
> test_accum += 1.0
> return x
> print(">>> Check1", test_accum.value)
> def test2(x):
> global test_accum
> test_accum += 100.0
> return x
> print(">>> Check2", test_accum.value)
> func_udf = F.udf(test, T.DoubleType())
> print(">>> Check3", test_accum.value)
> func_udf2 = F.udf(test2, T.DoubleType())
> print(">>> Check4", test_accum.value)
> data = data.withColumn("out1", func_udf(data["a"]))
> if SHUFFLE:
> data = data.repartition(2)
> print(">>> Check5", test_accum.value)
> data = data.withColumn("out2", func_udf2(data["b"]))
> if SHUFFLE:
> data = data.repartition(2)
> print(">>> Check6", test_accum.value)
> data.show()  # ACTION
> print(">>> Check7", test_accum.value)
> return data
> df = spark.createDataFrame([
> [1.0, 2.0]
> ], schema=T.StructType([T.StructField(field_name, T.DoubleType(), True) for 
> field_name in ["a", "b"]]))
> df2 = main(df)
> {code}
>  Output 1 - with SHUFFLE=False
> ...
> # >>> Check7 100.0
>  Output 2 - with SHUFFLE=True
> ...
> # >>> Check7 101.0
> Basically looks like:
>  - Accumulator works only for last UDF before a shuffle-like operation



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org