Repository: spark Updated Branches: refs/heads/master 07704c971 -> 11384893b
[SPARK-24208][SQL][FOLLOWUP] Move test cases to proper locations ## What changes were proposed in this pull request? The PR is a followup to move the test cases introduced by the original PR in their proper location. ## How was this patch tested? moved UTs Author: Marco Gaido <marcogaid...@gmail.com> Closes #21751 from mgaido91/SPARK-24208_followup. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/11384893 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/11384893 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/11384893 Branch: refs/heads/master Commit: 11384893b6ad09c0c8bc6a350bb9540d0d704bb4 Parents: 07704c9 Author: Marco Gaido <marcogaid...@gmail.com> Authored: Thu Jul 12 15:13:26 2018 -0700 Committer: Xiao Li <gatorsm...@gmail.com> Committed: Thu Jul 12 15:13:26 2018 -0700 ---------------------------------------------------------------------- python/pyspark/sql/tests.py | 32 ++++++++++---------- .../sql/catalyst/analysis/AnalysisSuite.scala | 18 +++++++++++ .../apache/spark/sql/GroupedDatasetSuite.scala | 12 -------- 3 files changed, 34 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/11384893/python/pyspark/sql/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 4404dbe..565654e 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -5471,6 +5471,22 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase): self.assertEqual(r.a, 'hi') self.assertEqual(r.b, 1) + def test_self_join_with_pandas(self): + import pyspark.sql.functions as F + + @F.pandas_udf('key long, col string', F.PandasUDFType.GROUPED_MAP) + def dummy_pandas_udf(df): + return df[['key', 'col']] + + df = self.spark.createDataFrame([Row(key=1, col='A'), Row(key=1, col='B'), + Row(key=2, col='C')]) + df_with_pandas = df.groupBy('key').apply(dummy_pandas_udf) + + # this was throwing an AnalysisException before SPARK-24208 + res = df_with_pandas.alias('temp0').join(df_with_pandas.alias('temp1'), + F.col('temp0.key') == F.col('temp1.key')) + self.assertEquals(res.count(), 5) + @unittest.skipIf( not _have_pandas or not _have_pyarrow, @@ -5925,22 +5941,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase): 'mixture.*aggregate function.*group aggregate pandas UDF'): df.groupby(df.id).agg(mean_udf(df.v), mean(df.v)).collect() - def test_self_join_with_pandas(self): - import pyspark.sql.functions as F - - @F.pandas_udf('key long, col string', F.PandasUDFType.GROUPED_MAP) - def dummy_pandas_udf(df): - return df[['key', 'col']] - - df = self.spark.createDataFrame([Row(key=1, col='A'), Row(key=1, col='B'), - Row(key=2, col='C')]) - dfWithPandas = df.groupBy('key').apply(dummy_pandas_udf) - - # this was throwing an AnalysisException before SPARK-24208 - res = dfWithPandas.alias('temp0').join(dfWithPandas.alias('temp1'), - F.col('temp0.key') == F.col('temp1.key')) - self.assertEquals(res.count(), 5) - @unittest.skipIf( not _have_pandas or not _have_pyarrow, http://git-wip-us.apache.org/repos/asf/spark/blob/11384893/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index cd85795..bbcdf6c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -21,6 +21,7 @@ import java.util.TimeZone import org.scalatest.Matchers +import org.apache.spark.api.python.PythonEvalType import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ @@ -557,4 +558,21 @@ class AnalysisSuite extends AnalysisTest with Matchers { SubqueryAlias("tbl", testRelation))) assertAnalysisError(barrier, Seq("cannot resolve '`tbl.b`'")) } + + test("SPARK-24208: analysis fails on self-join with FlatMapGroupsInPandas") { + val pythonUdf = PythonUDF("pyUDF", null, + StructType(Seq(StructField("a", LongType))), + Seq.empty, + PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, + true) + val output = pythonUdf.dataType.asInstanceOf[StructType].toAttributes + val project = Project(Seq(UnresolvedAttribute("a")), testRelation) + val flatMapGroupsInPandas = FlatMapGroupsInPandas( + Seq(UnresolvedAttribute("a")), pythonUdf, output, project) + val left = SubqueryAlias("temp0", flatMapGroupsInPandas) + val right = SubqueryAlias("temp1", flatMapGroupsInPandas) + val join = Join(left, right, Inner, None) + assertAnalysisSuccess( + Project(Seq(UnresolvedAttribute("temp0.a"), UnresolvedAttribute("temp1.a")), join)) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/11384893/sql/core/src/test/scala/org/apache/spark/sql/GroupedDatasetSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/GroupedDatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/GroupedDatasetSuite.scala index bd54ea4..147c0b6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/GroupedDatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/GroupedDatasetSuite.scala @@ -93,16 +93,4 @@ class GroupedDatasetSuite extends QueryTest with SharedSQLContext { } datasetWithUDF.unpersist(true) } - - test("SPARK-24208: analysis fails on self-join with FlatMapGroupsInPandas") { - val df = datasetWithUDF.groupBy("s").flatMapGroupsInPandas(PythonUDF( - "pyUDF", - null, - StructType(Seq(StructField("s", LongType))), - Seq.empty, - PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, - true)) - val df1 = df.alias("temp0").join(df.alias("temp1"), $"temp0.s" === $"temp1.s") - df1.queryExecution.assertAnalyzed() - } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org