[ https://issues.apache.org/jira/browse/SPARK-28269?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Bryan Cutler updated SPARK-28269: --------------------------------- Summary: Pandas Grouped Map UDF can get deadlocked (was: ArrowStreamPandasSerializer get stack) > Pandas Grouped Map UDF can get deadlocked > ----------------------------------------- > > Key: SPARK-28269 > URL: https://issues.apache.org/jira/browse/SPARK-28269 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 2.4.3 > Reporter: Modi Tamam > Priority: Major > Attachments: Untitled.xcf > > > I'm working with Pyspark version 2.4.3. > I have a big data frame: > * ~15M rows > * ~130 columns > * ~2.5 GB - I've converted it to a Pandas data frame, then, pickling it > (pandas_df.toPickle() ) resulted with a file of size 2.5GB. > I have some code that groups this data frame and applying a Pandas-UDF: > > {code:java} > from pyspark.sql import Row > from pyspark.sql.functions import lit, pandas_udf, PandasUDFType, to_json > from pyspark.sql.types import * > from pyspark.sql import functions as F > initial_list = range(4500) > rdd = sc.parallelize(initial_list) > rdd = rdd.map(lambda x: Row(val=x)) > initial_spark_df = spark.createDataFrame(rdd) > cols_count = 132 > rows = 1000 > # ------------------- Start Generating the big data frame------------------- > # Generating the schema > schema = StructType([StructField(str(i), IntegerType()) for i in > range(cols_count)]) > @pandas_udf(returnType=schema,functionType=PandasUDFType.GROUPED_MAP) > def random_pd_df_generator(df): > import numpy as np > import pandas as pd > return pd.DataFrame(np.random.randint(0, 100, size=(rows, cols_count)), > columns=range(cols_count)) > full_spark_df = initial_spark_df.groupBy("val").apply(random_pd_df_generator) > # ------------------- End Generating the big data frame------------------- > # -------------------Start the bug reproduction--------------------------- > grouped_col = "col_0" > @pandas_udf("%s string" %grouped_col, PandasUDFType.GROUPED_MAP) > def very_simpl_udf(pdf): > import pandas as pd > ret_val = pd.DataFrame({grouped_col: [str(pdf[grouped_col].iloc[0])]}) > return ret_val > # In order to create a huge dataset, I've set all of the grouped_col value to > a single value, then, grouped it into a single dataset. > # Here is where to program gets stuck > full_spark_df.withColumn(grouped_col,F.lit('0')).groupBy(grouped_col).apply(very_simpl_udf).show() > assert False, "If we're, means that the issue wasn't reproduced" > {code} > > The above code gets stuck on the ArrowStreamPandasSerializer: (on the first > line when reading batch from the reader) > > {code:java} > for batch in reader: > yield [self.arrow_to_pandas(c) for c in > pa.Table.from_batches([batch]).itercolumns()]{code} > > You can just run the first code snippet and it will reproduce. > Open a Pyspark shell with this configuration: > {code:java} > pyspark --conf "spark.python.worker.memory=3G" --conf > "spark.executor.memory=20G" --conf > "spark.executor.extraJavaOptions=-XX:+UseG1GC" --conf > "spark.driver.memory=10G"{code} > > Versions: > * pandas - 0.24.2 > * pyarrow - 0.13.0 > * Spark - 2.4.2 > * Python - 2.7.16 -- This message was sent by Atlassian JIRA (v7.6.14#76016) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org