[ https://issues.apache.org/jira/browse/SPARK-30952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
bruce_zhao updated SPARK-30952: ------------------------------- Description: We are trying to apply three-sigma rule in grouped data to detect anomaly data. We found that it's always crashed when a group returns an empty DataFrame (empty means no anomaly). Sample Code: {code:java} from pyspark.sql.functions import pandas_udf, PandasUDFType from pyspark.sql.types import StructField, StructType, StringType, LongType import pandas as pd from pyspark.sql import SparkSession import numpy as np def check_pdf(): schema = StructType([ StructField("customer_id", StringType(), True), StructField("count", LongType(), True) ]) @pandas_udf(schema, PandasUDFType.GROUPED_MAP) def handler(pdf): mean = float(np.mean(pdf["count"])) sigma = float(np.std(pdf["count"], ddof=1)) return pdf[pdf["count"] > mean + 3 * sigma] return handler def main(): spark = SparkSession.builder \ .appName("AppTest") \ .master("local[4]") \ .config("spark.driver.host", "localhost") \ .config("spark.sql.shuffle.partitions", 2) \ .getOrCreate() df = spark.createDataFrame([ { "count": 15, "customer_id": "c1" }, { "count": 11, "customer_id": "c1" }, { "count": 11, "customer_id": "c2" } ]) result = df.groupby("customer_id").apply(check_pdf()).collect() print(result) spark.stop() if __name__ == '__main__': main() {code} Exception: {code:java} 2020-02-26 10:56:45 ERROR Executor:91 - Exception in task 0.0 in stage 1.0 (TID 4) org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:486) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:475) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:178) at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)Caused by: java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:159) ... 20 more {code} was: We are trying to apply three-sigma rule in grouped data to detect anomaly data. We found that it's always crashed when a group returns an empty DataFrame (empty means no anomaly). Sample Code: {code:java} from pyspark.sql.functions import pandas_udf, PandasUDFType from pyspark.sql.types import StructField, StructType, StringType, LongType import pandas as pd from pyspark.sql import SparkSession import numpy as np def check_pdf(): schema = StructType([ StructField("customer_id", StringType(), True), StructField("count", LongType(), True) ]) @pandas_udf(schema, PandasUDFType.GROUPED_MAP) def handler(pdf): mean = float(np.mean(pdf["count"])) sigma = float(np.std(pdf["count"], ddof=1)) print(mean+3*sigma) return pdf[pdf["count"] > mean + 3 * sigma] return handler def main(): spark = SparkSession.builder \ .appName("AppTest") \ .master("local[4]") \ .config("spark.driver.host", "localhost") \ .config("spark.sql.shuffle.partitions", 2) \ .getOrCreate() df = spark.createDataFrame([ { "count": 15, "customer_id": "c1" }, { "count": 11, "customer_id": "c1" }, { "count": 11, "customer_id": "c2" } ]) result = df.groupby("customer_id").apply(check_pdf()).collect() print(result) spark.stop() if __name__ == '__main__': main() {code} Exception: {code:java} 2020-02-26 10:56:45 ERROR Executor:91 - Exception in task 0.0 in stage 1.0 (TID 4) org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:486) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:475) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:178) at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)Caused by: java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:159) ... 20 more {code} > Grouped pandas_udf crashed when a group returned an empty DataFrame > -------------------------------------------------------------------- > > Key: SPARK-30952 > URL: https://issues.apache.org/jira/browse/SPARK-30952 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 2.4.0 > Reporter: bruce_zhao > Priority: Major > > We are trying to apply three-sigma rule in grouped data to detect anomaly > data. We found that it's always crashed when a group returns an empty > DataFrame (empty means no anomaly). > > Sample Code: > {code:java} > from pyspark.sql.functions import pandas_udf, PandasUDFType > from pyspark.sql.types import StructField, StructType, StringType, LongType > import pandas as pd > from pyspark.sql import SparkSession > import numpy as np > def check_pdf(): > schema = StructType([ > StructField("customer_id", StringType(), True), > StructField("count", LongType(), True) > ]) > @pandas_udf(schema, PandasUDFType.GROUPED_MAP) > def handler(pdf): > mean = float(np.mean(pdf["count"])) > sigma = float(np.std(pdf["count"], ddof=1)) > return pdf[pdf["count"] > mean + 3 * sigma] > return handler > def main(): > spark = SparkSession.builder \ > .appName("AppTest") \ > .master("local[4]") \ > .config("spark.driver.host", "localhost") \ > .config("spark.sql.shuffle.partitions", 2) \ > .getOrCreate() > df = spark.createDataFrame([ > { > "count": 15, > "customer_id": "c1" > }, > { > "count": 11, > "customer_id": "c1" > }, > { > "count": 11, > "customer_id": "c2" > } > ]) > result = df.groupby("customer_id").apply(check_pdf()).collect() > print(result) > spark.stop() > if __name__ == '__main__': > main() > {code} > Exception: > {code:java} > 2020-02-26 10:56:45 ERROR Executor:91 - Exception in task 0.0 in stage 1.0 > (TID 4) > org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) > at > org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:486) > > at > org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:475) > > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:178) > > at > org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122) > > at > org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406) > > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) > > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) > > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:121) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748)Caused by: java.io.EOFException at > java.io.DataInputStream.readInt(DataInputStream.java:392) at > org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:159) > > ... 20 more > {code} > > > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org