[ https://issues.apache.org/jira/browse/SPARK-25898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kaushalya updated SPARK-25898: ------------------------------ Summary: pandas_udf not catching errors early on (was: pandas_udf ) > pandas_udf not catching errors early on > --------------------------------------- > > Key: SPARK-25898 > URL: https://issues.apache.org/jira/browse/SPARK-25898 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.3.2 > Reporter: Kaushalya > Priority: Major > > The usage of pandas_udfs do not show the possible errors in implementation at > first point of invocation. The errors manifest down the line when accessing > the returned dataframe. The issue is not found in small datasets, only when > the datasets are distributed across servers. > {code:java} > import pandas as pd > import numpy as np > import matplotlib.pyplot as plt > from pyspark.sql import SparkSession > from pyspark.sql.functions import pandas_udf,PandasUDFType > from pyspark.sql.types import * > spark = SparkSession.builder.appName("myapp").getOrCreate() > schema = StructType(one_second_df.schema.fields + > [StructField("gradient_order_rate", DoubleType())]) > one_second_df = spark.read.parquet("data") > @pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP) > def gradient(df): > df["gradient"] = np.gradient(df.val_count) > return df > agg_df = one_second_df.groupby("id").apply(gradient) > agg_df.show(2) > #### THE FOLLOWING LINES THREW ERRORS such as : > "Shape of array too small to calculate a numerical gradient, " > ValueError: Shape of array too small to calculate a numerical gradient, at > least (edge_order + 1) elements are required." > fd = agg_df.filter(agg_df["id"] == "a") > fd.show(5) > Weirdly this error is thrown after the second show command which should > technically have run through the entire dataset but probably doesn't do that. > I think this should be fixed, as it would be difficult to catch errors that > manifest down the line if they are not identifiable earlier on. > {code} > > Please also find working code for a similar workflow but with a much smaller > dataset > {code:java} > from pyspark.sql.functions import pandas_udf,PandasUDFType > df3 = spark.createDataFrame( > [("a", 1, 0), ("a", -1, 42), ("b", 3, -1), ("b", 10, -2)], > ("key", "value1", "value2") > ) > from pyspark.sql.types import * > schema = StructType([ > StructField("key", StringType()), > StructField("avg_value1", DoubleType()), > StructField("avg_value2", DoubleType()), > StructField("sum_avg", DoubleType()), > StructField("sub_avg", DoubleType()) > ]) > @pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP) > def g(df): > gr = df['key'].iloc[0] > x = df.value1.mean() > y = df.value2.mean() > w = df.value1.mean() + df.value2.mean() > z = df.value1.mean() - df.value2.mean() > return pd.DataFrame([[gr]+[x]+[y]+[w]+[z]]) > ss = df3.groupby("key").apply(g) > ss.show(5) > my_filter = ss.filter(ss["key"] == 'a') > my_filter.show(2) > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org