[GitHub] spark pull request #23248: [SPARK-26293][SQL] Cast exception when having pyt...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/23248#discussion_r239925749 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala --- @@ -131,8 +131,20 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] with PredicateHelper { expressions.flatMap(collectEvaluableUDFs) } - def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { -case plan: LogicalPlan => extract(plan) + def apply(plan: LogicalPlan): LogicalPlan = plan match { +// SPARK-26293: A subquery will be rewritten into join later, and will go through this rule +// eventually. Here we skip subquery, as Python UDF only needs to be extracted once. +case _: Subquery => plan --- End diff -- I see. If it's common to skip Subquery in other rules, I guess it's ok to put it in here as well. But it would definitely be helpful to establish some kind of guidance, maybe sth like "All optimizer rule should skip Subquery because OptimizeSubqueries will execute them anyway"? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r239922856 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala --- @@ -144,24 +282,107 @@ case class WindowInPandasExec( queue.close() } - val inputProj = UnsafeProjection.create(allInputs, child.output) - val pythonInput = grouped.map { case (_, rows) => -rows.map { row => - queue.add(row.asInstanceOf[UnsafeRow]) - inputProj(row) + val stream = iter.map { row => +queue.add(row.asInstanceOf[UnsafeRow]) +row + } + + val pythonInput = new Iterator[Iterator[UnsafeRow]] { + +// Manage the stream and the grouping. +var nextRow: UnsafeRow = null +var nextGroup: UnsafeRow = null +var nextRowAvailable: Boolean = false +private[this] def fetchNextRow() { + nextRowAvailable = stream.hasNext + if (nextRowAvailable) { +nextRow = stream.next().asInstanceOf[UnsafeRow] +nextGroup = grouping(nextRow) + } else { +nextRow = null +nextGroup = null + } +} +fetchNextRow() + +// Manage the current partition. +val buffer: ExternalAppendOnlyUnsafeRowArray = + new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold) +var bufferIterator: Iterator[UnsafeRow] = _ + +val indexRow = new SpecificInternalRow(Array.fill(numBoundIndices)(IntegerType)) + +val frames = factories.map(_(indexRow)) + +private[this] def fetchNextPartition() { + // Collect all the rows in the current partition. + // Before we start to fetch new input rows, make a copy of nextGroup. + val currentGroup = nextGroup.copy() + + // clear last partition + buffer.clear() + + while (nextRowAvailable && nextGroup == currentGroup) { --- End diff -- I did fix the original `WindowExec` as well but of course we can fix it in a separate PR. I will revert my latest change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r239587375 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala --- @@ -144,24 +282,107 @@ case class WindowInPandasExec( queue.close() } - val inputProj = UnsafeProjection.create(allInputs, child.output) - val pythonInput = grouped.map { case (_, rows) => -rows.map { row => - queue.add(row.asInstanceOf[UnsafeRow]) - inputProj(row) + val stream = iter.map { row => +queue.add(row.asInstanceOf[UnsafeRow]) +row + } + + val pythonInput = new Iterator[Iterator[UnsafeRow]] { + +// Manage the stream and the grouping. +var nextRow: UnsafeRow = null +var nextGroup: UnsafeRow = null +var nextRowAvailable: Boolean = false +private[this] def fetchNextRow() { + nextRowAvailable = stream.hasNext + if (nextRowAvailable) { +nextRow = stream.next().asInstanceOf[UnsafeRow] +nextGroup = grouping(nextRow) + } else { +nextRow = null +nextGroup = null + } +} +fetchNextRow() + +// Manage the current partition. +val buffer: ExternalAppendOnlyUnsafeRowArray = + new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold) +var bufferIterator: Iterator[UnsafeRow] = _ + +val indexRow = new SpecificInternalRow(Array.fill(numBoundIndices)(IntegerType)) + +val frames = factories.map(_(indexRow)) + +private[this] def fetchNextPartition() { + // Collect all the rows in the current partition. + // Before we start to fetch new input rows, make a copy of nextGroup. + val currentGroup = nextGroup.copy() + + // clear last partition + buffer.clear() + + while (nextRowAvailable && nextGroup == currentGroup) { --- End diff -- Good catch. @ueshin Do you mind double checking what I am doing now is correct? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r239587065 --- Diff: python/pyspark/sql/tests/test_pandas_udf_window.py --- @@ -87,8 +96,34 @@ def ordered_window(self): def unpartitioned_window(self): return Window.partitionBy() +@property +def sliding_row_window(self): +return Window.partitionBy('id').orderBy('v').rowsBetween(-2, 1) + +@property +def sliding_range_window(self): +return Window.partitionBy('id').orderBy('v').rangeBetween(-2, 4) + +@property +def growing_row_window(self): +return Window.partitionBy('id').orderBy('v').rowsBetween(Window.unboundedPreceding, 3) + +@property +def growing_range_window(self): +return Window.partitionBy('id').orderBy('v') \ +.rangeBetween(Window.unboundedPreceding, 4) + +@property +def shrinking_row_window(self): +return Window.partitionBy('id').orderBy('v').rowsBetween(-2, Window.unboundedFollowing) + +@property +def shrinking_range_window(self): +return Window.partitionBy('id').orderBy('v') \ +.rangeBetween(-3, Window.unboundedFollowing) + def test_simple(self): -from pyspark.sql.functions import mean +from pyspark.sql.functions import pandas_udf, PandasUDFType, percent_rank, mean, max --- End diff -- Reverted --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r239587136 --- Diff: python/pyspark/sql/tests/test_pandas_udf_window.py --- @@ -245,11 +278,101 @@ def test_invalid_args(self): foo_udf = pandas_udf(lambda x: x, 'v double', PandasUDFType.GROUPED_MAP) df.withColumn('v2', foo_udf(df['v']).over(w)) -with QuietTest(self.sc): -with self.assertRaisesRegexp( -AnalysisException, -'.*Only unbounded window frame is supported.*'): -df.withColumn('mean_v', mean_udf(df['v']).over(ow)) +def test_bounded_simple(self): +from pyspark.sql.functions import mean, max, min, count + +df = self.data +w1 = self.sliding_row_window +w2 = self.shrinking_range_window + +plus_one = self.python_plus_one +count_udf = self.pandas_agg_count_udf +mean_udf = self.pandas_agg_mean_udf +max_udf = self.pandas_agg_max_udf +min_udf = self.pandas_agg_min_udf + +result1 = df.withColumn('mean_v', mean_udf(plus_one(df['v'])).over(w1)) \ +.withColumn('count_v', count_udf(df['v']).over(w2)) \ +.withColumn('max_v', max_udf(df['v']).over(w2)) \ +.withColumn('min_v', min_udf(df['v']).over(w1)) + +expected1 = df.withColumn('mean_v', mean(plus_one(df['v'])).over(w1)) \ +.withColumn('count_v', count(df['v']).over(w2)) \ +.withColumn('max_v', max(df['v']).over(w2)) \ +.withColumn('min_v', min(df['v']).over(w1)) + +self.assertPandasEqual(expected1.toPandas(), result1.toPandas()) + +def test_growing_window(self): +from pyspark.sql.functions import mean + +df = self.data +w1 = self.growing_row_window +w2 = self.growing_range_window + +mean_udf = self.pandas_agg_mean_udf + +result1 = df.withColumn('m1', mean_udf(df['v']).over(w1)) \ +.withColumn('m2', mean_udf(df['v']).over(w2)) + +expected1 = df.withColumn('m1', mean(df['v']).over(w1)) \ +.withColumn('m2', mean(df['v']).over(w2)) + +self.assertPandasEqual(expected1.toPandas(), result1.toPandas()) + +def test_sliding_window(self): +from pyspark.sql.functions import mean + +df = self.data +w1 = self.sliding_row_window +w2 = self.sliding_range_window + +mean_udf = self.pandas_agg_mean_udf + +result1 = df.withColumn('m1', mean_udf(df['v']).over(w1)) \ +.withColumn('m2', mean_udf(df['v']).over(w2)) + +expected1 = df.withColumn('m1', mean(df['v']).over(w1)) \ +.withColumn('m2', mean(df['v']).over(w2)) + +self.assertPandasEqual(expected1.toPandas(), result1.toPandas()) + +def test_shrinking_window(self): +from pyspark.sql.functions import mean + +df = self.data +w1 = self.shrinking_row_window +w2 = self.shrinking_range_window + +mean_udf = self.pandas_agg_mean_udf + +result1 = df.withColumn('m1', mean_udf(df['v']).over(w1)) \ +.withColumn('m2', mean_udf(df['v']).over(w2)) + +expected1 = df.withColumn('m1', mean(df['v']).over(w1)) \ +.withColumn('m2', mean(df['v']).over(w2)) + +self.assertPandasEqual(expected1.toPandas(), result1.toPandas()) + +def test_bounded_mixed(self): +from pyspark.sql.functions import mean, max, min, count --- End diff -- Yes - removed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r239587089 --- Diff: python/pyspark/sql/tests/test_pandas_udf_window.py --- @@ -231,12 +266,10 @@ def test_array_type(self): self.assertEquals(result1.first()['v2'], [1.0, 2.0]) def test_invalid_args(self): -from pyspark.sql.functions import pandas_udf, PandasUDFType +from pyspark.sql.functions import mean, pandas_udf, PandasUDFType --- End diff -- Reverted --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r239587020 --- Diff: python/pyspark/sql/tests/test_pandas_udf_window.py --- @@ -44,9 +44,18 @@ def python_plus_one(self): @property def pandas_scalar_time_two(self): -from pyspark.sql.functions import pandas_udf +from pyspark.sql.functions import pandas_udf, PandasUDFType --- End diff -- Sorry. Reverted --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23248: [SPARK-26293][SQL] Cast exception when having pyt...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/23248#discussion_r239565253 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala --- @@ -131,8 +131,20 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] with PredicateHelper { expressions.flatMap(collectEvaluableUDFs) } - def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { -case plan: LogicalPlan => extract(plan) + def apply(plan: LogicalPlan): LogicalPlan = plan match { +// SPARK-26293: A subquery will be rewritten into join later, and will go through this rule +// eventually. Here we skip subquery, as Python UDF only needs to be extracted once. +case _: Subquery => plan --- End diff -- Personally I found it a bit confusing when two seeming unrelated things are put together (Subquery and ExtractPythonUDFs). I wonder if it's sufficient to make ExtractPythonUDFs idempotent? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregati...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/22305 Hi @BryanCutler @HyukjinKwon @ueshin , mind taking another look? I think this is in a good shape. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r235417425 --- Diff: python/pyspark/worker.py --- @@ -154,6 +154,47 @@ def wrapped(*series): return lambda *a: (wrapped(*a), arrow_return_type) +def wrap_bounded_window_agg_pandas_udf(f, return_type): +arrow_return_type = to_arrow_type(return_type) + +def wrapped(begin_index, end_index, *series): +import numpy as np +import pandas as pd +result = [] +for i in range(0, len(begin_index)): +begin = begin_index[i] +end = end_index[i] +range_index = np.arange(begin, end) +# Note: Create a slice from a series is actually pretty expensive to +# do for each window. However, there is no way to reduce/eliminate +# the cost of creating sub series here AFAIK. +# TODO: s.take might be the best way to create sub series +series_slices = [s.take(range_index) for s in series] +result.append(f(*series_slices)) +return pd.Series(result) + +return lambda *a: (wrapped(*a), arrow_return_type) + + +def wrap_bounded_window_agg_pandas_udf_np(f, return_type): --- End diff -- This is removed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregati...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/22305 @BryanCutler @HyukjinKwon @ueshin I have addressed all the comments so far. Could you please take another look? Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r235182927 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -63,7 +65,7 @@ private[spark] object PythonEvalType { */ private[spark] abstract class BasePythonRunner[IN, OUT]( funcs: Seq[ChainedPythonFunctions], -evalType: Int, +evalTypes: Seq[Int], --- End diff -- I have reverted the evalType change and use a runner config instead. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r234790479 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala --- @@ -27,17 +27,62 @@ import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, SparkPlan} import org.apache.spark.sql.execution.arrow.ArrowUtils -import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.sql.execution.window._ +import org.apache.spark.sql.types._ import org.apache.spark.util.Utils +/** + * This class calculates and outputs windowed aggregates over the rows in a single partition. + * + * It is very similar to [[WindowExec]] and has similar logic. The main difference is that this + * node doesn't not compute any window aggregation values. Instead, it computes the lower and + * upper bound for each window (i.e. window bounds) and pass the data and indices to python work + * to do the actual window aggregation. + * + * It currently materialize all data associated with the same partition key and pass them to --- End diff -- Thanks! Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r234790633 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala --- @@ -73,68 +118,151 @@ case class WindowInPandasExec( } /** - * Create the resulting projection. - * - * This method uses Code Generation. It can only be used on the executor side. + * Get all relevant helper functions and data structures for window bounds * - * @param expressions unbound ordered function expressions. - * @return the final resulting projection. + * This function returns: + * (1) Total number of window bound indices in the python input row + * (2) Function from frame index to its lower bound column index in the python input row + * (3) Function from frame index to its upper bound column index in the python input row + * (4) Function indicates whether a frame requires window bound indices + * (5) Function from frame index to its eval type */ - private[this] def createResultProjection(expressions: Seq[Expression]): UnsafeProjection = { -val references = expressions.zipWithIndex.map { case (e, i) => - // Results of window expressions will be on the right side of child's output - BoundReference(child.output.size + i, e.dataType, e.nullable) + private def computeWindowBoundHelpers( + factories: Seq[InternalRow => WindowFunctionFrame] + ): (Int, Int => Int, Int => Int, Int => Boolean, Int => Int) = { --- End diff -- Sounds good. I added a type for that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r234790364 --- Diff: python/pyspark/sql/tests.py --- @@ -89,6 +89,7 @@ from pyspark.sql.types import _merge_type from pyspark.tests import QuietTest, ReusedPySparkTestCase, PySparkTestCase, SparkSubmitTests from pyspark.sql.functions import UserDefinedFunction, sha2, lit +import pyspark.sql.functions as F --- End diff -- Thanks! Removed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r234790403 --- Diff: python/pyspark/sql/tests.py --- @@ -7064,12 +7098,104 @@ def test_invalid_args(self): foo_udf = pandas_udf(lambda x: x, 'v double', PandasUDFType.GROUPED_MAP) df.withColumn('v2', foo_udf(df['v']).over(w)) -with QuietTest(self.sc): -with self.assertRaisesRegexp( -AnalysisException, -'.*Only unbounded window frame is supported.*'): -df.withColumn('mean_v', mean_udf(df['v']).over(ow)) +def test_bounded_simple(self): +from pyspark.sql.functions import mean, max, min, count + +df = self.data +w1 = self.sliding_row_window +w2 = self.shrinking_range_window + +plus_one = self.python_plus_one +count_udf = self.pandas_agg_count_udf +mean_udf = self.pandas_agg_mean_udf +max_udf = self.pandas_agg_max_udf +min_udf = self.pandas_agg_min_udf + +result1 = df.withColumn('mean_v', mean_udf(plus_one(df['v'])).over(w1))\ +.withColumn('count_v', count_udf(df['v']).over(w2)) \ +.withColumn('max_v', max_udf(df['v']).over(w2)) \ +.withColumn('min_v', min_udf(df['v']).over(w1)) \ + +expected1 = df.withColumn('mean_v', mean(plus_one(df['v'])).over(w1))\ +.withColumn('count_v', count(df['v']).over(w2)) \ +.withColumn('max_v', max(df['v']).over(w2)) \ +.withColumn('min_v', min(df['v']).over(w1)) \ + +self.assertPandasEqual(expected1.toPandas(), result1.toPandas()) + +def test_growing_window(self): +from pyspark.sql.functions import mean + +df = self.data +w1 = self.growing_row_window +w2 = self.growing_range_window +mean_udf = self.pandas_agg_mean_udf + +result1 = df.withColumn('m1', mean_udf(df['v']).over(w1)) \ +.withColumn('m2', mean_udf(df['v']).over(w2)) + +expected1 = df.withColumn('m1', mean(df['v']).over(w1)) \ +.withColumn('m2', mean(df['v']).over(w2)) + +self.assertPandasEqual(expected1.toPandas(), result1.toPandas()) + +def test_sliding_window(self): +from pyspark.sql.functions import mean + +df = self.data +w1 = self.sliding_row_window +w2 = self.sliding_range_window + +mean_udf = self.pandas_agg_mean_udf + +result1 = df.withColumn('m1', mean_udf(df['v']).over(w1)) \ +.withColumn('m2', mean_udf(df['v']).over(w2)) + +expected1 = df.withColumn('m1', mean(df['v']).over(w1)) \ +.withColumn('m2', mean(df['v']).over(w2)) + +result1.show() +expected1.show() --- End diff -- Thanks! Removed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r232393476 --- Diff: python/pyspark/sql/tests.py --- @@ -6323,6 +6333,33 @@ def ordered_window(self): def unpartitioned_window(self): return Window.partitionBy() +@property +def sliding_row_window(self): +return Window.partitionBy('id').orderBy('v').rowsBetween(-2, 1) + +@property +def sliding_range_window(self): +from pyspark.sql.functions import lit +return Window.partitionBy('id').orderBy('v').rangeBetween(lit(-2.0), lit(4.0)) --- End diff -- Thanks! I fixed this to use the rangeBetween(Long, Long) API --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r232393452 --- Diff: python/pyspark/sql/tests.py --- @@ -6323,6 +6333,33 @@ def ordered_window(self): def unpartitioned_window(self): return Window.partitionBy() +@property +def sliding_row_window(self): +return Window.partitionBy('id').orderBy('v').rowsBetween(-2, 1) + +@property +def sliding_range_window(self): +from pyspark.sql.functions import lit +return Window.partitionBy('id').orderBy('v').rangeBetween(lit(-2.0), lit(4.0)) + +@property +def growing_row_window(self): +return Window.partitionBy('id').orderBy('v').rowsBetween(Window.unboundedPreceding, 3) + +@property +def growing_range_window(self): +return Window.partitionBy('id').orderBy('v') \ +.rangeBetween(F.unboundedPreceding(), F.lit(4.0)) --- End diff -- Thanks! I fixed this to use the rangeBetween(Long, Long) API --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r232393335 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala --- @@ -27,17 +27,62 @@ import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, SparkPlan} import org.apache.spark.sql.execution.arrow.ArrowUtils -import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.sql.execution.window._ +import org.apache.spark.sql.types._ import org.apache.spark.util.Utils +/** + * This class calculates and outputs windowed aggregates over the rows in a single partition. + * + * It is very similar to [[WindowExec]] and has similar logic. The main difference is that this + * node doesn't not compute any window aggregation values. Instead, it computes the lower and + * upper bound for each window (i.e. window bounds) and pass the data and indices to python work + * to do the actual window aggregation. + * + * It currently materialize all data associated with the same partition key and pass them to + * Python. This is not strictly necessary for sliding windows and can be improved (by slicing + * data into overlapping small chunks and stitch them together). + * + * This class groups window expressions by their window boundaries so that window expressions + * with the same window boundaries can share the same window bounds. The window bounds are + * prepended to the data passed to the python worker. + * + * For example, if we have: + * avg(v) over specifiedwindowframe(RowFrame, -5, 5), + * avg(v) over specifiedwindowframe(RowFrame, UnboundedPreceding, UnboundedFollowing), + * avg(v) over specifiedwindowframe(RowFrame, -3, 3), + * max(v) over specifiedwindowframe(RowFrame, -3, 3) + * + * The python input will look like: + * (lower_bound_w1, upper_bound_w1, lower_bound_w3, upper_bound_w3, v) + * + * where w1 is specifiedwindowframe(RowFrame, -5, 5) + * w2 is specifiedwindowframe(RowFrame, UnboundedPreceding, UnboundedFollowing) + * w3 is specifiedwindowframe(RowFrame, -3, 3) + * + * Note that w2 doesn't have bound indices in the python input because its unbounded window + * so it's bound indices will always be the same. + * + * Unbounded window also have a different eval type, because: + * (1) It doesn't have bound indices as input + * (2) The udf only needs to be evaluated once the in python worker (because the udf is + * deterministic and window bounds are the same for all windows) + * + * The logic to compute window bounds is delegated to [[WindowFunctionFrame]] and shared with + * [[WindowExec]] + * + * Note this doesn't support partial aggregation and all aggregation is computed from the entire + * window. + */ case class WindowInPandasExec( windowExpression: Seq[NamedExpression], partitionSpec: Seq[Expression], orderSpec: Seq[SortOrder], -child: SparkPlan) extends UnaryExecNode { +child: SparkPlan +) extends WindowExecBase(windowExpression, partitionSpec, orderSpec, child) { --- End diff -- Done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r232393305 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala --- @@ -73,68 +118,147 @@ case class WindowInPandasExec( } /** - * Create the resulting projection. - * - * This method uses Code Generation. It can only be used on the executor side. + * Helper function to get all relevant helper functions and data structures for window bounds * - * @param expressions unbound ordered function expressions. - * @return the final resulting projection. + * This function returns: + * (1) Total number of window bound indices in the python input row + * (2) Function from frame index to its lower bound column index in the python input row + * (3) Function from frame index to its upper bound column index in the python input row + * (4) Function that returns a frame requires window bound indices in the python input row + * (unbounded window doesn't need it) + * (5) Function from frame index to its eval type */ - private[this] def createResultProjection(expressions: Seq[Expression]): UnsafeProjection = { -val references = expressions.zipWithIndex.map { case (e, i) => - // Results of window expressions will be on the right side of child's output - BoundReference(child.output.size + i, e.dataType, e.nullable) + private def computeWindowBoundHelpers( + factories: Seq[InternalRow => WindowFunctionFrame] + ): (Int, Int => Int, Int => Int, Int => Boolean, Int => Int) = { +val dummyRow = new SpecificInternalRow() +val functionFrames = factories.map(_(dummyRow)) + +val evalTypes = functionFrames.map { + case _: UnboundedWindowFunctionFrame => PythonEvalType.SQL_UNBOUNDED_WINDOW_AGG_PANDAS_UDF + case _ => PythonEvalType.SQL_BOUNDED_WINDOW_AGG_PANDAS_UDF +} + +val requiredIndices = functionFrames.map { + case _: UnboundedWindowFunctionFrame => 0 + case _ => 2 } -val unboundToRefMap = expressions.zip(references).toMap -val patchedWindowExpression = windowExpression.map(_.transform(unboundToRefMap)) -UnsafeProjection.create( - child.output ++ patchedWindowExpression, - child.output) + +val upperBoundIndices = requiredIndices.scan(0)(_ + _).tail + +val boundIndices = (requiredIndices zip upperBoundIndices).map {case (num, upperBoundIndex) => --- End diff -- Done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r232393187 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala --- @@ -73,68 +118,147 @@ case class WindowInPandasExec( } /** - * Create the resulting projection. - * - * This method uses Code Generation. It can only be used on the executor side. + * Helper function to get all relevant helper functions and data structures for window bounds * - * @param expressions unbound ordered function expressions. - * @return the final resulting projection. + * This function returns: + * (1) Total number of window bound indices in the python input row + * (2) Function from frame index to its lower bound column index in the python input row + * (3) Function from frame index to its upper bound column index in the python input row + * (4) Function that returns a frame requires window bound indices in the python input row + * (unbounded window doesn't need it) + * (5) Function from frame index to its eval type */ - private[this] def createResultProjection(expressions: Seq[Expression]): UnsafeProjection = { -val references = expressions.zipWithIndex.map { case (e, i) => - // Results of window expressions will be on the right side of child's output - BoundReference(child.output.size + i, e.dataType, e.nullable) + private def computeWindowBoundHelpers( + factories: Seq[InternalRow => WindowFunctionFrame] + ): (Int, Int => Int, Int => Int, Int => Boolean, Int => Int) = { +val dummyRow = new SpecificInternalRow() +val functionFrames = factories.map(_(dummyRow)) + +val evalTypes = functionFrames.map { + case _: UnboundedWindowFunctionFrame => PythonEvalType.SQL_UNBOUNDED_WINDOW_AGG_PANDAS_UDF + case _ => PythonEvalType.SQL_BOUNDED_WINDOW_AGG_PANDAS_UDF +} + +val requiredIndices = functionFrames.map { + case _: UnboundedWindowFunctionFrame => 0 + case _ => 2 } -val unboundToRefMap = expressions.zip(references).toMap -val patchedWindowExpression = windowExpression.map(_.transform(unboundToRefMap)) -UnsafeProjection.create( - child.output ++ patchedWindowExpression, - child.output) + +val upperBoundIndices = requiredIndices.scan(0)(_ + _).tail + +val boundIndices = (requiredIndices zip upperBoundIndices).map {case (num, upperBoundIndex) => +if (num == 0) { + // Sentinel values for unbounded window + (-1, -1) +} else { + (upperBoundIndex - 2, upperBoundIndex - 1) +} +} + +def lowerBoundIndex(frameIndex: Int) = boundIndices(frameIndex)._1 +def upperBoundIndex(frameIndex: Int) = boundIndices(frameIndex)._2 +def frameEvalType(frameIndex: Int) = evalTypes(frameIndex) +def frameRequireIndex(frameIndex: Int) = + evalTypes(frameIndex) == PythonEvalType.SQL_BOUNDED_WINDOW_AGG_PANDAS_UDF + +(requiredIndices.sum, lowerBoundIndex, upperBoundIndex, frameRequireIndex, frameEvalType) } protected override def doExecute(): RDD[InternalRow] = { -val inputRDD = child.execute() +// Unwrap the expressions and factories from the map. +val expressionsWithFrameIndex = + windowFrameExpressionFactoryPairs.map(_._1).zipWithIndex.flatMap { +case (buffer, frameIndex) => buffer.map( expr => (expr, frameIndex)) + } + +val expressions = expressionsWithFrameIndex.map(_._1) +val expressionIndexToFrameIndex = + expressionsWithFrameIndex.map(_._2).zipWithIndex.map(_.swap).toMap + +val factories = windowFrameExpressionFactoryPairs.map(_._2).toArray + +val (numBoundIndices, lowerBoundIndex, upperBoundIndex, frameRequireIndex, frameEvalType) = + computeWindowBoundHelpers(factories) + +val funcEvalTypes = expressions.indices.map( + i => frameEvalType(expressionIndexToFrameIndex(i))) + +val numFrames = factories.length + +val inMemoryThreshold = sqlContext.conf.windowExecBufferInMemoryThreshold +val spillThreshold = sqlContext.conf.windowExecBufferSpillThreshold --- End diff -- Thanks! Done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r232388369 --- Diff: python/pyspark/worker.py --- @@ -154,6 +154,47 @@ def wrapped(*series): return lambda *a: (wrapped(*a), arrow_return_type) +def wrap_bounded_window_agg_pandas_udf(f, return_type): +arrow_return_type = to_arrow_type(return_type) + +def wrapped(begin_index, end_index, *series): +import numpy as np +import pandas as pd +result = [] +for i in range(0, len(begin_index)): +begin = begin_index[i] +end = end_index[i] +range_index = np.arange(begin, end) +# Note: Create a slice from a series is actually pretty expensive to +# do for each window. However, there is no way to reduce/eliminate +# the cost of creating sub series here AFAIK. +# TODO: s.take might be the best way to create sub series +series_slices = [s.take(range_index) for s in series] --- End diff -- Good catch, `s[begin:end]` is actually faster --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r232084279 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala --- @@ -73,68 +118,147 @@ case class WindowInPandasExec( } /** - * Create the resulting projection. - * - * This method uses Code Generation. It can only be used on the executor side. + * Helper function to get all relevant helper functions and data structures for window bounds * - * @param expressions unbound ordered function expressions. - * @return the final resulting projection. + * This function returns: + * (1) Total number of window bound indices in the python input row + * (2) Function from frame index to its lower bound column index in the python input row + * (3) Function from frame index to its upper bound column index in the python input row + * (4) Function that returns a frame requires window bound indices in the python input row + * (unbounded window doesn't need it) + * (5) Function from frame index to its eval type */ - private[this] def createResultProjection(expressions: Seq[Expression]): UnsafeProjection = { -val references = expressions.zipWithIndex.map { case (e, i) => - // Results of window expressions will be on the right side of child's output - BoundReference(child.output.size + i, e.dataType, e.nullable) + private def computeWindowBoundHelpers( + factories: Seq[InternalRow => WindowFunctionFrame] + ): (Int, Int => Int, Int => Int, Int => Boolean, Int => Int) = { +val dummyRow = new SpecificInternalRow() --- End diff -- Yes, this is for figuring out the types of each `WindowFunctionFrame`. These function frames are created temporary and thrown away when this function returns so it's not great... However in order to create the proper function frames we would need to know the total number window indices, so it's a bit of chicken and egg problem here... I don't see an easy way :( --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregati...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/22305 No worries. Thank you @HyukjinKwon and @ueshin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregati...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/22305 Hey @gatorsmile it has been quite a while with no review progress on this. @BryanCutler has some initial comments but I want to get more people's feedback before addressing those. Since now 2.4 is out, I figured you guys have some more bandwidth. Do you think we can get some reviewers' attention on this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r227591746 --- Diff: python/pyspark/sql/tests.py --- @@ -6323,6 +6333,33 @@ def ordered_window(self): def unpartitioned_window(self): return Window.partitionBy() +@property +def sliding_row_window(self): --- End diff -- I think they could be variables. I don't particular care one way or the other. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r227591518 --- Diff: python/pyspark/sql/tests.py --- @@ -6481,12 +6516,116 @@ def test_invalid_args(self): foo_udf = pandas_udf(lambda x: x, 'v double', PandasUDFType.GROUPED_MAP) df.withColumn('v2', foo_udf(df['v']).over(w)) -with QuietTest(self.sc): -with self.assertRaisesRegexp( -AnalysisException, -'.*Only unbounded window frame is supported.*'): -df.withColumn('mean_v', mean_udf(df['v']).over(ow)) +def test_bounded_simple(self): +from pyspark.sql.functions import mean, max, min, count + +df = self.data +w1 = self.sliding_row_window +w2 = self.shrinking_range_window + +plus_one = self.python_plus_one +count_udf = self.pandas_agg_count_udf +mean_udf = self.pandas_agg_mean_udf +max_udf = self.pandas_agg_max_udf +min_udf = self.pandas_agg_min_udf + +result1 = df.withColumn('mean_v', mean_udf(plus_one(df['v'])).over(w1))\ +.withColumn('count_v', count_udf(df['v']).over(w2)) \ +.withColumn('max_v', max_udf(df['v']).over(w2)) \ +.withColumn('min_v', min_udf(df['v']).over(w1)) \ + +expected1 = df.withColumn('mean_v', mean(plus_one(df['v'])).over(w1))\ +.withColumn('count_v', count(df['v']).over(w2)) \ +.withColumn('max_v', max(df['v']).over(w2)) \ +.withColumn('min_v', min(df['v']).over(w1)) \ + +result1.explain(True) +expected1.explain(True) + +result1.show() +expected1.show() --- End diff -- Ah, missed those, will do, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r227591428 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -63,7 +65,7 @@ private[spark] object PythonEvalType { */ private[spark] abstract class BasePythonRunner[IN, OUT]( funcs: Seq[ChainedPythonFunctions], -evalType: Int, +evalTypes: Seq[Int], --- End diff -- > So couldn't you just send an index that encompasses the entire range for unbounded This is actually what I first did. However, I think this would require sending more data than necessary for the unbounded case. In worst case it will be 3x the number of columns (begin_index, end_index, data) comparing to just one column (data). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregati...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/22305 @felixcheung I am waiting for some in-depth review. @ueshin do you have some time to review this in the near future? Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r224548624 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -63,7 +65,7 @@ private[spark] object PythonEvalType { */ private[spark] abstract class BasePythonRunner[IN, OUT]( funcs: Seq[ChainedPythonFunctions], -evalType: Int, +evalTypes: Seq[Int], --- End diff -- I see your point - I can see this being used for other things too, for example, numpy variant vectorized UDFs, or Window transform UDFs for unbounded window (n -> n mapping for unbounded window, such as rank). I choose this approach because of the flexibility. For this particular case, it is possible to distinguish between bounded/unbounded, for example, maybe sending something in the arg offsets or sth like that, but this would be using arg offsets for sth else... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r223762966 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -63,7 +65,7 @@ private[spark] object PythonEvalType { */ private[spark] abstract class BasePythonRunner[IN, OUT]( funcs: Seq[ChainedPythonFunctions], -evalType: Int, +evalTypes: Seq[Int], --- End diff -- There might be other way to do this. I like extending a single eval type to a seq of eval types because it seems pretty simple and flexible to me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r223761447 --- Diff: python/pyspark/worker.py --- @@ -154,6 +154,47 @@ def wrapped(*series): return lambda *a: (wrapped(*a), arrow_return_type) +def wrap_bounded_window_agg_pandas_udf(f, return_type): +arrow_return_type = to_arrow_type(return_type) + +def wrapped(begin_index, end_index, *series): +import numpy as np +import pandas as pd +result = [] +for i in range(0, len(begin_index)): +begin = begin_index[i] +end = end_index[i] +range_index = np.arange(begin, end) +# Note: Create a slice from a series is actually pretty expensive to +# do for each window. However, there is no way to reduce/eliminate +# the cost of creating sub series here AFAIK. +# TODO: s.take might be the best way to create sub series +series_slices = [s.take(range_index) for s in series] --- End diff -- I forgot the reason that I choose this take over iloc. I will do a bit more investigation and report back. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r223754106 --- Diff: python/pyspark/worker.py --- @@ -154,6 +154,47 @@ def wrapped(*series): return lambda *a: (wrapped(*a), arrow_return_type) +def wrap_bounded_window_agg_pandas_udf(f, return_type): +arrow_return_type = to_arrow_type(return_type) + +def wrapped(begin_index, end_index, *series): +import numpy as np +import pandas as pd +result = [] +for i in range(0, len(begin_index)): +begin = begin_index[i] +end = end_index[i] +range_index = np.arange(begin, end) +# Note: Create a slice from a series is actually pretty expensive to +# do for each window. However, there is no way to reduce/eliminate +# the cost of creating sub series here AFAIK. +# TODO: s.take might be the best way to create sub series +series_slices = [s.take(range_index) for s in series] +result.append(f(*series_slices)) +return pd.Series(result) + +return lambda *a: (wrapped(*a), arrow_return_type) + + +def wrap_bounded_window_agg_pandas_udf_np(f, return_type): --- End diff -- Yes this is not used. I leave it here just to show the difference for now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregati...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/22305 @BryanCutler Yes that was a typo :) Thanks! I am also +1 to support numpy data structure in addition to Pandas. So happy to discuss here or separately. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregati...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/22305 Hey folks, any thoughts on this PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22620: [SPARK-25601][PYTHON] Register Grouped aggregate ...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22620#discussion_r222698014 --- Diff: python/pyspark/sql/udf.py --- @@ -310,9 +319,11 @@ def register(self, name, f, returnType=None): "Invalid returnType: data type can not be specified when f is" "a user-defined function, but got %s." % returnType) if f.evalType not in [PythonEvalType.SQL_BATCHED_UDF, - PythonEvalType.SQL_SCALAR_PANDAS_UDF]: + PythonEvalType.SQL_SCALAR_PANDAS_UDF, + PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF]: --- End diff -- I opened https://issues.apache.org/jira/browse/SPARK-25640 to track this. To be clear, this is transparent to end users, but I agree it can be confusing to developers. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22620: [SPARK-25601][PYTHON] Register Grouped aggregate ...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22620#discussion_r222456993 --- Diff: python/pyspark/sql/udf.py --- @@ -310,9 +319,11 @@ def register(self, name, f, returnType=None): "Invalid returnType: data type can not be specified when f is" "a user-defined function, but got %s." % returnType) if f.evalType not in [PythonEvalType.SQL_BATCHED_UDF, - PythonEvalType.SQL_SCALAR_PANDAS_UDF]: + PythonEvalType.SQL_SCALAR_PANDAS_UDF, + PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF]: --- End diff -- We don't need it here: Users specify GROUPED_AGG only. GROUPED_AGG is turned to WINDOW_AGG eval type in WindowInPandasExec. Admittedly, there is a bit confusion here we can improve. We just haven't got a user specified udf type that maps to multiple evalType before WINDOW_AGG. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22620: [SPARK-25601][PYTHON] Register Grouped aggregate UDF Vec...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/22620 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22620: [SPARK-25601][PYTHON] Register Grouped aggregate ...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22620#discussion_r222421940 --- Diff: python/pyspark/sql/udf.py --- @@ -298,6 +298,15 @@ def register(self, name, f, returnType=None): >>> spark.sql("SELECT add_one(id) FROM range(3)").collect() # doctest: +SKIP [Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)] +>>> @pandas_udf("integer", PandasUDFType.GROUPED_AGG) # doctest: +SKIP +... def sum_udf(v): +... return v.sum() +... +>>> _ = spark.udf.register("sum_udf", sum_udf) # doctest: +SKIP --- End diff -- Ha. I see.. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22620: [SPARK-25601][PYTHON] Register Grouped aggregate ...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22620#discussion_r222411585 --- Diff: python/pyspark/sql/udf.py --- @@ -298,6 +298,15 @@ def register(self, name, f, returnType=None): >>> spark.sql("SELECT add_one(id) FROM range(3)").collect() # doctest: +SKIP [Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)] +>>> @pandas_udf("integer", PandasUDFType.GROUPED_AGG) # doctest: +SKIP +... def sum_udf(v): +... return v.sum() +... +>>> _ = spark.udf.register("sum_udf", sum_udf) # doctest: +SKIP --- End diff -- what is the "_ =" thing here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregati...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/22305 Gental ping @cloud-fan @gatorsmile @HyukjinKwon @ueshin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregati...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/22305 cc @HyukjinKwon @ueshin @BryanCutler @felixcheung This PR is ready for review. I have updated the description so hopefully it is easier to review. Please let me know if you need any clarification or anything I can help with the review. Thanks! Li --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [WIP][SPARK-24561][SQL][Python] User-defined wind...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r218244042 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.window + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.types.{CalendarIntervalType, DateType, IntegerType, TimestampType} + +private[sql] abstract class WindowExecBase( +windowExpression: Seq[NamedExpression], +partitionSpec: Seq[Expression], +orderSpec: Seq[SortOrder], +child: SparkPlan) extends UnaryExecNode { + + /** + * Create the resulting projection. + * + * This method uses Code Generation. It can only be used on the executor side. + * + * @param expressions unbound ordered function expressions. + * @return the final resulting projection. + */ + protected def createResultProjection(expressions: Seq[Expression]): UnsafeProjection = { +val references = expressions.zipWithIndex.map { case (e, i) => + // Results of window expressions will be on the right side of child's output + BoundReference(child.output.size + i, e.dataType, e.nullable) +} +val unboundToRefMap = expressions.zip(references).toMap +val patchedWindowExpression = windowExpression.map(_.transform(unboundToRefMap)) +UnsafeProjection.create( + child.output ++ patchedWindowExpression, + child.output) + } + + /** + * Create a bound ordering object for a given frame type and offset. A bound ordering object is + * used to determine which input row lies within the frame boundaries of an output row. + * + * This method uses Code Generation. It can only be used on the executor side. + * + * @param frame to evaluate. This can either be a Row or Range frame. + * @param bound with respect to the row. + * @param timeZone the session local timezone for time related calculations. + * @return a bound ordering object. + */ + protected def createBoundOrdering( + frame: FrameType, bound: Expression, timeZone: String): BoundOrdering = { +(frame, bound) match { + case (RowFrame, CurrentRow) => +RowBoundOrdering(0) + + case (RowFrame, IntegerLiteral(offset)) => +RowBoundOrdering(offset) + + case (RangeFrame, CurrentRow) => +val ordering = newOrdering(orderSpec, child.output) +RangeBoundOrdering(ordering, IdentityProjection, IdentityProjection) + + case (RangeFrame, offset: Expression) if orderSpec.size == 1 => +// Use only the first order expression when the offset is non-null. +val sortExpr = orderSpec.head +val expr = sortExpr.child + +// Create the projection which returns the current 'value'. +val current = newMutableProjection(expr :: Nil, child.output) + +// Flip the sign of the offset when processing the order is descending +val boundOffset = sortExpr.direction match { + case Descending => UnaryMinus(offset) + case Ascending => offset +} + +// Create the projection which returns the current 'value' modified by adding the offset. +val boundExpr = (expr.dataType, boundOffset.dataType) match { + case (DateType, IntegerType) => DateAdd(expr, boundOffset) + case (TimestampType, CalendarIntervalType) => +TimeAdd(expr, boundOffset, Some
[GitHub] spark pull request #22305: [WIP][SPARK-24561][SQL][Python] User-defined wind...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r218243887 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.window + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.types.{CalendarIntervalType, DateType, IntegerType, TimestampType} + +private[sql] abstract class WindowExecBase( +windowExpression: Seq[NamedExpression], +partitionSpec: Seq[Expression], +orderSpec: Seq[SortOrder], +child: SparkPlan) extends UnaryExecNode { + + /** + * Create the resulting projection. + * + * This method uses Code Generation. It can only be used on the executor side. + * + * @param expressions unbound ordered function expressions. + * @return the final resulting projection. + */ + protected def createResultProjection(expressions: Seq[Expression]): UnsafeProjection = { +val references = expressions.zipWithIndex.map { case (e, i) => + // Results of window expressions will be on the right side of child's output + BoundReference(child.output.size + i, e.dataType, e.nullable) +} +val unboundToRefMap = expressions.zip(references).toMap +val patchedWindowExpression = windowExpression.map(_.transform(unboundToRefMap)) +UnsafeProjection.create( + child.output ++ patchedWindowExpression, + child.output) + } + + /** + * Create a bound ordering object for a given frame type and offset. A bound ordering object is + * used to determine which input row lies within the frame boundaries of an output row. + * + * This method uses Code Generation. It can only be used on the executor side. + * + * @param frame to evaluate. This can either be a Row or Range frame. + * @param bound with respect to the row. + * @param timeZone the session local timezone for time related calculations. + * @return a bound ordering object. + */ + protected def createBoundOrdering( + frame: FrameType, bound: Expression, timeZone: String): BoundOrdering = { +(frame, bound) match { + case (RowFrame, CurrentRow) => +RowBoundOrdering(0) + + case (RowFrame, IntegerLiteral(offset)) => +RowBoundOrdering(offset) + + case (RangeFrame, CurrentRow) => +val ordering = newOrdering(orderSpec, child.output) +RangeBoundOrdering(ordering, IdentityProjection, IdentityProjection) + + case (RangeFrame, offset: Expression) if orderSpec.size == 1 => +// Use only the first order expression when the offset is non-null. +val sortExpr = orderSpec.head +val expr = sortExpr.child + +// Create the projection which returns the current 'value'. +val current = newMutableProjection(expr :: Nil, child.output) + +// Flip the sign of the offset when processing the order is descending +val boundOffset = sortExpr.direction match { + case Descending => UnaryMinus(offset) + case Ascending => offset +} + +// Create the projection which returns the current 'value' modified by adding the offset. +val boundExpr = (expr.dataType, boundOffset.dataType) match { + case (DateType, IntegerType) => DateAdd(expr, boundOffset) + case (TimestampType, CalendarIntervalType) => +TimeAdd(expr, boundOffset, Some
[GitHub] spark issue #22305: [WIP][SPARK-24561][SQL][Python] User-defined window aggr...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/22305 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22329: [SPARK-25328][PYTHON] Add an example for having two colu...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/22329 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22329: [SPARK-25328][PYTHON] Add an example for having t...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22329#discussion_r215267320 --- Diff: python/pyspark/sql/functions.py --- @@ -2804,6 +2804,22 @@ def pandas_udf(f=None, returnType=None, functionType=None): | 1|1.5| | 2|6.0| +---+---+ + >>> @pandas_udf( + ..."id long, additional_key double, v double", --- End diff -- do you mind changing the type of additional_key to long? It seems like the type coercion here is not necessary. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22329: [SPARK-25328][PYTHON] Add an example for having t...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22329#discussion_r214940744 --- Diff: python/pyspark/sql/functions.py --- @@ -2804,6 +2804,20 @@ def pandas_udf(f=None, returnType=None, functionType=None): | 1|1.5| | 2|6.0| +---+---+ + >>> @pandas_udf("id long, v1 double, v2 double", PandasUDFType.GROUPED_MAP) # doctest: +SKIP --- End diff -- It took me a while to realize `v1` is a grouping key. It also a bit uncommon to use double value as a grouping key . How about we do sth like? `id long, additional_key long, v double` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22305: [WIP][SPARK-24561][SQL][Python] User-defined window aggr...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/22305 The current state is a minimum working version - I copied some code from `WindowExec` to make this work but will need to refactor those. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [WIP][SPARK-24561][SQL][Python] User-defined wind...
GitHub user icexelloss opened a pull request: https://github.com/apache/spark/pull/22305 [WIP][SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) ## What changes were proposed in this pull request? ### **This is currently WIP** This PR implements a new feature - window aggregation Pandas UDF for bounded window. Example: ``` @pandas_udf('double', PandasUDFType.GROUPED_AGG) def avg(v): return v.mean() return avg w = Window.partitionBy('id').rowsBetween(-2, 3) result1 = df.withColumn('mean_v', avg(df['v']).over(w)) ``` ## How was this patch tested? New tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/icexelloss/spark SPARK-24561-bounded-window-udf Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22305.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22305 commit 4a4ba9406b7cb393eb4397083f872ee463eca97e Author: Li Jin Date: 2018-08-29T20:39:16Z wip commit f9e73265dee746e3192a858bea1b5eca6a1b1826 Author: Li Jin Date: 2018-08-30T14:49:16Z Remove empty line commit 28414fb9f099bd063dc24cb36878d006ccd2d53d Author: Li Jin Date: 2018-08-31T14:51:09Z Initial commit (WIP) commit 51a9dcf0166d68ce7fbdc380a686426eeff5ebb6 Author: Li Jin Date: 2018-08-31T15:22:44Z Fix case for unbounded window --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22208: [SPARK-25216][SQL] Improve error message when a column c...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/22208 @dongjoon-hyun SGTM. I misunderstood your suggestion about resolver. Keeping it simple was my preference too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22208: [SPARK-25216][SQL] Improve error message when a column c...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/22208 @dongjoon-hyun Could please take another look? I changed to use resolver and try to resolve column with backticks and added unit tests as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in FileSo...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/22104 Thanks all for the review! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22244: [WIP][SPARK-24721][SPARK-25213][SQL] extract python UDF ...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/22244 @cloud-fan Thanks! I will take a look later today and incorporate this with my patch. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22208: [SPARK-25216][SQL] Improve error message when a c...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22208#discussion_r212716787 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -216,8 +216,16 @@ class Dataset[T] private[sql]( private[sql] def resolve(colName: String): NamedExpression = { queryExecution.analyzed.resolveQuoted(colName, sparkSession.sessionState.analyzer.resolver) .getOrElse { -throw new AnalysisException( - s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})""") +if (schema.fieldNames.contains(colName)) { + throw new AnalysisException( +s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")}). + | Try adding backticks to the column name, i.e., `$colName`""" --- End diff -- I see, how about: ``` Try adding backticks to the column name, i.e., `$colName`, if $colName is the name of the whole column ``` I am fine with either one --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22208: [SPARK-25216][SQL] Improve error message when a c...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22208#discussion_r212629188 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -216,8 +216,16 @@ class Dataset[T] private[sql]( private[sql] def resolve(colName: String): NamedExpression = { queryExecution.analyzed.resolveQuoted(colName, sparkSession.sessionState.analyzer.resolver) .getOrElse { -throw new AnalysisException( - s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})""") +if (schema.fieldNames.contains(colName)) { + throw new AnalysisException( +s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")}). + | Try adding backticks to the column name, i.e., `$colName`""" --- End diff -- @HyukjinKwon Thanks for the review! Sorry I don't quite understand your sentence here: > if the name parts in the column should be kept as the part of its column name Would you mind elaborating what do you mean? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22104#discussion_r212460124 --- Diff: python/pyspark/sql/tests.py --- @@ -3367,6 +3367,35 @@ def test_ignore_column_of_all_nulls(self): finally: shutil.rmtree(path) +# SPARK-24721 +def test_datasource_with_udf_filter_lit_input(self): +import pandas as pd +import numpy as np +from pyspark.sql.functions import udf, pandas_udf, lit, col + +path = tempfile.mkdtemp() +shutil.rmtree(path) +try: + self.spark.range(1).write.mode("overwrite").format('csv').save(path) +filesource_df = self.spark.read.csv(path) --- End diff -- Created separate tests for pandas_udf under ScalarPandasUDFTests --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in FileSo...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/22104 @HyukjinKwon I addressed the comments. Do you mind taking a another look? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22208: Improve error message when a column containing do...
GitHub user icexelloss opened a pull request: https://github.com/apache/spark/pull/22208 Improve error message when a column containing dot cannot be resolved ## What changes were proposed in this pull request? The current error message is often confusing to a new Spark user that a column containing "." needs backticks quote. For example, consider the following code: ``` spark.range(0, 1).toDF('a.b')['a.b'] ``` the current message looks like: ``` Cannot resolve column name "a.b" among (a.b) ``` This PR improves the error message to, ``` Cannot resolve column name "a.b" among (a.b). Try adding backticks to the column name, i.e., `a.b`; ``` ## How was this patch tested? Manual test in shell You can merge this pull request into a Git repository by running: $ git pull https://github.com/icexelloss/spark SPARK-25216-backticks-error-message Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22208.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22208 commit 21a3732104f311785946a0808dbc132f0e7a892e Author: Li Jin Date: 2018-08-23T18:42:37Z Improve error message when a column containing dot cannot be resolved --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22104#discussion_r212396812 --- Diff: python/pyspark/sql/tests.py --- @@ -3367,6 +3367,33 @@ def test_ignore_column_of_all_nulls(self): finally: shutil.rmtree(path) +# SPARK-24721 +def test_datasource_with_udf_filter_lit_input(self): +from pyspark.sql.functions import udf, lit, col + +path = tempfile.mkdtemp() +shutil.rmtree(path) +try: + self.spark.range(1).write.mode("overwrite").format('csv').save(path) +filesource_df = self.spark.read.csv(path) +datasource_df = self.spark.read \ +.format("org.apache.spark.sql.sources.SimpleScanSource") \ +.option('from', 0).option('to', 1).load() +datasource_v2_df = self.spark.read \ + .format("org.apache.spark.sql.sources.v2.SimpleDataSourceV2") \ --- End diff -- Added checks to skip the tests if scala tests are not compiled --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22104#discussion_r212347966 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala --- @@ -117,15 +117,18 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil } }.toArray }.toArray - val projection = newMutableProjection(allInputs, child.output) + + // Project input rows to unsafe row so we can put it in the row queue + val unsafeProjection = UnsafeProjection.create(child.output, child.output) --- End diff -- Created https://jira.apache.org/jira/browse/SPARK-25213 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22104#discussion_r212340459 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala --- @@ -117,15 +117,18 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil } }.toArray }.toArray - val projection = newMutableProjection(allInputs, child.output) + + // Project input rows to unsafe row so we can put it in the row queue + val unsafeProjection = UnsafeProjection.create(child.output, child.output) --- End diff -- @cloud-fan Sorry, I don't think I am being very clear... > If the data source does not produce UnsafeRow, Spark will make sure there will be a project > above it to produce UnsafeRow I don't think this is happening for datasource V2 right now: (Code running in pyspark test) ``` datasource_v2_df = self.spark.read \ .format("org.apache.spark.sql.sources.v2.SimpleDataSourceV2") \ .load() result = datasource_v2_df.withColumn('x', udf(lambda x: x, 'int')(datasource_v2_df['i'])) result.show() ``` The code above fails with: ``` Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to org.apache.spark.sql.catalyst.expressions.UnsafeRow at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1$$anonfun$5.apply(EvalPythonExec.scala:127) at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1$$anonfun$5.apply(EvalPythonExec.scala:126) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1074) ``` I think this is an issue with DataSourceV2 that probably should be addressed in another PR (DataSourceV1 works fine). @cloud-fan WDYT? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22104#discussion_r212309541 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala --- @@ -117,15 +117,18 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil } }.toArray }.toArray - val projection = newMutableProjection(allInputs, child.output) + + // Project input rows to unsafe row so we can put it in the row queue + val unsafeProjection = UnsafeProjection.create(child.output, child.output) --- End diff -- Thanks! I will remove this then. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21546: [SPARK-23030][SQL][PYTHON] Use Arrow stream forma...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21546#discussion_r211964996 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala --- @@ -183,34 +178,106 @@ private[sql] object ArrowConverters { } /** - * Convert a byte array to an ArrowRecordBatch. + * Load a serialized ArrowRecordBatch. */ - private[arrow] def byteArrayToBatch( + private[arrow] def loadBatch( batchBytes: Array[Byte], allocator: BufferAllocator): ArrowRecordBatch = { -val in = new ByteArrayReadableSeekableByteChannel(batchBytes) -val reader = new ArrowFileReader(in, allocator) - -// Read a batch from a byte stream, ensure the reader is closed -Utils.tryWithSafeFinally { - val root = reader.getVectorSchemaRoot // throws IOException - val unloader = new VectorUnloader(root) - reader.loadNextBatch() // throws IOException - unloader.getRecordBatch -} { - reader.close() -} +val in = new ByteArrayInputStream(batchBytes) +MessageSerializer.deserializeRecordBatch( + new ReadChannel(Channels.newChannel(in)), allocator) // throws IOException } + /** + * Create a DataFrame from a JavaRDD of serialized ArrowRecordBatches. + */ private[sql] def toDataFrame( - payloadRDD: JavaRDD[Array[Byte]], + arrowBatchRDD: JavaRDD[Array[Byte]], schemaString: String, sqlContext: SQLContext): DataFrame = { -val rdd = payloadRDD.rdd.mapPartitions { iter => +val schema = DataType.fromJson(schemaString).asInstanceOf[StructType] +val timeZoneId = sqlContext.sessionState.conf.sessionLocalTimeZone +val rdd = arrowBatchRDD.rdd.mapPartitions { iter => val context = TaskContext.get() - ArrowConverters.fromPayloadIterator(iter.map(new ArrowPayload(_)), context) + ArrowConverters.fromBatchIterator(iter, schema, timeZoneId, context) } -val schema = DataType.fromJson(schemaString).asInstanceOf[StructType] sqlContext.internalCreateDataFrame(rdd, schema) } + + /** + * Read a file as an Arrow stream and parallelize as an RDD of serialized ArrowRecordBatches. + */ + private[sql] def readArrowStreamFromFile( + sqlContext: SQLContext, + filename: String): JavaRDD[Array[Byte]] = { +val fileStream = new FileInputStream(filename) +try { + // Create array so that we can safely close the file + val batches = getBatchesFromStream(fileStream.getChannel).toArray + // Parallelize the record batches to create an RDD + JavaRDD.fromRDD(sqlContext.sparkContext.parallelize(batches, batches.length)) +} finally { + fileStream.close() +} + } + + /** + * Read an Arrow stream input and return an iterator of serialized ArrowRecordBatches. + */ + private[sql] def getBatchesFromStream(in: SeekableByteChannel): Iterator[Array[Byte]] = { + +// Create an iterator to get each serialized ArrowRecordBatch from a stream +new Iterator[Array[Byte]] { + var batch: Array[Byte] = readNextBatch() + + override def hasNext: Boolean = batch != null + + override def next(): Array[Byte] = { +val prevBatch = batch +batch = readNextBatch() +prevBatch + } + + def readNextBatch(): Array[Byte] = { +val msgMetadata = MessageSerializer.readMessage(new ReadChannel(in)) +if (msgMetadata == null) { + return null +} + +// Get the length of the body, which has not be read at this point +val bodyLength = msgMetadata.getMessageBodyLength.toInt + +// Only care about RecordBatch data, skip Schema and unsupported Dictionary messages +if (msgMetadata.getMessage.headerType() == MessageHeader.RecordBatch) { + + // Create output backed by buffer to hold msg length (int32), msg metadata, msg body + val bbout = new ByteBufferOutputStream(4 + msgMetadata.getMessageLength + bodyLength) --- End diff -- Add a comment that this is the deserialized form of an Arrow Record Batch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22104#discussion_r211733007 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala --- @@ -117,15 +117,18 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil } }.toArray }.toArray - val projection = newMutableProjection(allInputs, child.output) + + // Project input rows to unsafe row so we can put it in the row queue + val unsafeProjection = UnsafeProjection.create(child.output, child.output) --- End diff -- Friendly ping @cloud-fan. Do you think forcing a unsafeProject here to deal with non-unsafe rows from data sources are correct? Is there a way to know whether the children nodes output unsafe rows so to avoid unnecessary unsafe projection here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22104#discussion_r210996331 --- Diff: python/pyspark/sql/tests.py --- @@ -3367,6 +3367,33 @@ def test_ignore_column_of_all_nulls(self): finally: shutil.rmtree(path) +# SPARK-24721 +def test_datasource_with_udf_filter_lit_input(self): +from pyspark.sql.functions import udf, lit, col + +path = tempfile.mkdtemp() +shutil.rmtree(path) +try: + self.spark.range(1).write.mode("overwrite").format('csv').save(path) +filesource_df = self.spark.read.csv(path) +datasource_df = self.spark.read \ +.format("org.apache.spark.sql.sources.SimpleScanSource") \ +.option('from', 0).option('to', 1).load() +datasource_v2_df = self.spark.read \ + .format("org.apache.spark.sql.sources.v2.SimpleDataSourceV2") \ --- End diff -- Hmm... I think this is a bit fragile because things like "scala-2.11" (scala version can change). Seems a bit over complicated to do this properly, when do we expect pyspark test to run without compiling scala test classes? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22104#discussion_r210955687 --- Diff: python/pyspark/sql/tests.py --- @@ -3367,6 +3367,33 @@ def test_ignore_column_of_all_nulls(self): finally: shutil.rmtree(path) +# SPARK-24721 +def test_datasource_with_udf_filter_lit_input(self): +from pyspark.sql.functions import udf, lit, col + +path = tempfile.mkdtemp() +shutil.rmtree(path) +try: + self.spark.range(1).write.mode("overwrite").format('csv').save(path) +filesource_df = self.spark.read.csv(path) +datasource_df = self.spark.read \ +.format("org.apache.spark.sql.sources.SimpleScanSource") \ +.option('from', 0).option('to', 1).load() +datasource_v2_df = self.spark.read \ + .format("org.apache.spark.sql.sources.v2.SimpleDataSourceV2") \ --- End diff -- I can probably check try to check the existence of sql/core/target/scala-2.11/test-classes --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22104#discussion_r210954447 --- Diff: python/pyspark/sql/tests.py --- @@ -3367,6 +3367,33 @@ def test_ignore_column_of_all_nulls(self): finally: shutil.rmtree(path) +# SPARK-24721 +def test_datasource_with_udf_filter_lit_input(self): +from pyspark.sql.functions import udf, lit, col + +path = tempfile.mkdtemp() +shutil.rmtree(path) +try: + self.spark.range(1).write.mode("overwrite").format('csv').save(path) +filesource_df = self.spark.read.csv(path) +datasource_df = self.spark.read \ +.format("org.apache.spark.sql.sources.SimpleScanSource") \ +.option('from', 0).option('to', 1).load() +datasource_v2_df = self.spark.read \ + .format("org.apache.spark.sql.sources.v2.SimpleDataSourceV2") \ --- End diff -- @HyukjinKwon I actually am not sure how does pyspark find these classes and how to check the existence, do you have an example? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in FileSo...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/22104 Tests pass now. This comment https://github.com/apache/spark/pull/22104/files#r210414941 requires some attention. @cloud-fan Do you think this is the right way to handle GenericInternalRow inputs here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22104#discussion_r210414941 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala --- @@ -117,15 +117,18 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil } }.toArray }.toArray - val projection = newMutableProjection(allInputs, child.output) + + // Project input rows to unsafe row so we can put it in the row queue + val unsafeProjection = UnsafeProjection.create(child.output, child.output) --- End diff -- This requires some discussion. This is probably another bug I found in testing this - If the input node to EvalPythonExec doesn't produce UnsafeRow, and cast here will fail. I don't know if we require data sources to produce unsafe rows, if not, then this is a problem. I also don't know if this will introduce additional copy if input is already UnsafeRow - it seems like UnsafeProject should be smart to skip the copy but I am not sure if it's actually the case --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22104#discussion_r210410738 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala --- @@ -117,15 +117,16 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil } }.toArray }.toArray - val projection = newMutableProjection(allInputs, child.output) + val projection = UnsafeProjection.create(allInputs, child.output) val schema = StructType(dataTypes.zipWithIndex.map { case (dt, i) => StructField(s"_$i", dt) }) // Add rows to queue to join later with the result. val projectedRowIter = iter.map { inputRow => -queue.add(inputRow.asInstanceOf[UnsafeRow]) -projection(inputRow) +val unsafeRow = projection(inputRow) +queue.add(unsafeRow.asInstanceOf[UnsafeRow]) --- End diff -- Ok.. This seems to break existing tests. Need to look into it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22104#discussion_r210391237 --- Diff: python/pyspark/sql/tests.py --- @@ -3367,6 +3367,35 @@ def test_ignore_column_of_all_nulls(self): finally: shutil.rmtree(path) +# SPARK-24721 +def test_datasource_with_udf_filter_lit_input(self): +import pandas as pd +import numpy as np +from pyspark.sql.functions import udf, pandas_udf, lit, col + +path = tempfile.mkdtemp() +shutil.rmtree(path) +try: + self.spark.range(1).write.mode("overwrite").format('csv').save(path) +filesource_df = self.spark.read.csv(path) --- End diff -- @gatorsmile Added tests for file source, data source and data source v2. I might need to move the pandas_udf tests into another tests because of arrow_requirement :( --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22104#discussion_r210390770 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala --- @@ -133,6 +134,9 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { } def apply(plan: SparkPlan): SparkPlan = plan transformUp { +// SPARK-24721: Ignore Python UDFs in DataSourceScan and DataSourceV2Scan +case plan: DataSourceScanExec => plan --- End diff -- I get rid of the logic previously in `FileSourceStrategy` to exclude PythonUDF in the filter in favor of this fix - I think this fix is cleaner. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22104#discussion_r210390399 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala --- @@ -117,15 +117,16 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil } }.toArray }.toArray - val projection = newMutableProjection(allInputs, child.output) + val projection = UnsafeProjection.create(allInputs, child.output) val schema = StructType(dataTypes.zipWithIndex.map { case (dt, i) => StructField(s"_$i", dt) }) // Add rows to queue to join later with the result. val projectedRowIter = iter.map { inputRow => -queue.add(inputRow.asInstanceOf[UnsafeRow]) -projection(inputRow) +val unsafeRow = projection(inputRow) +queue.add(unsafeRow.asInstanceOf[UnsafeRow]) --- End diff -- This is probably another bug I found in testing this - If the input node to EvalPythonExec doesn't produce UnsafeRow, and cast here will fail. I found this in testing when I pass in an test data source scan node, which produces GeneralInternalRow, will throw exception here. I am happy to submit this as a separate patch if people think it's necessary --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in FileSo...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/22104 Thanks @HyukjinKwon and @cloud-fan ! I will take a look --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in FileSo...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/22104 I think another way to fix this is to move the logic to `ExtractPythonUDF` to ignore `FileScanExec` `DataSourceScanExec` and `DataSourceV2ScanExec` instead of changing all three rules. The downside is that if a XScanExec node with a Python UDF pushed filter throws exception somewhere else, we need to fix that too. Not sure which way is better. But either way, it would be good to create test case with data source and data source V2... Would appreciate some advise on how to create such relation in test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in FileSo...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/22104 @gatorsmile Can you advise how to create a df with data source? All my attempts end up triggering FileSourceStrategy not DataSourceStrategy --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in FileSo...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/22104 @gatorsmile Possibly, let me see if I can create a test case --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22104#discussion_r210052093 --- Diff: python/pyspark/sql/tests.py --- @@ -3367,6 +3367,24 @@ def test_ignore_column_of_all_nulls(self): finally: shutil.rmtree(path) +def test_datasource_with_udf_filter_lit_input(self): --- End diff -- Make sense. Will add. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in FileSo...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/22104 retest please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in FileSo...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/22104 cc @cloud-fan . Followed your suggestion here: https://issues.apache.org/jira/browse/SPARK-24721?focusedCommentId=16560537=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16560537 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...
GitHub user icexelloss opened a pull request: https://github.com/apache/spark/pull/22104 [SPARK-24721][SQL] Exclude Python UDFs filters in FileSourceStrategy ## What changes were proposed in this pull request? The PR excludes Python UDFs filters in FileSourceStrategy so that they don't ExtractPythonUDF rule to throw exception. It doesn't make sense to pass Python UDF filters in FileSourceStrategy anyway because they cannot be used as push down filters. ## How was this patch tested? Add a new regression test You can merge this pull request into a Git repository by running: $ git pull https://github.com/icexelloss/spark SPARK-24721-udf-filter Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22104.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22104 commit 512f4b64cb7662baa23995c6f6c109a735ec8f5e Author: Li Jin Date: 2018-08-14T14:22:50Z Fix file strategy to exclude python UDF filters --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21928: [SPARK-24976][PYTHON] Allow None for Decimal type conver...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/21928 I see. Yeah sounds good to me. On Tue, Jul 31, 2018 at 12:30 PM Hyukjin Kwon wrote: > I think we shouldn't change minimum PyArrow version in 2.4.0 and the > upgrade doesn't require to change it as far as I remember. > > We should backport this anyway even if we will upgrade it for 2.4.0. let's > go ahead. > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/21928#issuecomment-409283599>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AAwbrI0nm_g9FHFrQFonGQtzvh7HeQQ6ks5uMIYJgaJpZM4VnjD_> > . > --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21887: [SPARK-23633][SQL] Update Pandas UDFs section in sql-pro...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/21887 Thanks! @HyukjinKwon --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21928: [SPARK-24976][PYTHON] Allow None for Decimal type conver...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/21928 @HyukjinKwon arrow 0.10.0 release is around the corner. I think Spark 2.4 will very likely to ship with 0.10.0 (where I believe this issue has been fixed, @BryanCutler can you confirm?) I am not sure if it's necessary to has this patch just for pyarrow 0.9.0 ... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21887: [SPARK-23633][SQL] Update Pandas UDFs section in sql-pro...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/21887 @HyukjinKwon I manually generated the doc and looks good to me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21887: [SPARK-23633][SQL] Update Pandas UDFs section in ...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21887#discussion_r205943208 --- Diff: examples/src/main/python/sql/arrow.py --- @@ -113,6 +113,42 @@ def substract_mean(pdf): # $example off:grouped_map_pandas_udf$ +def grouped_agg_pandas_udf_example(spark): +# $example on:grouped_agg_pandas_udf$ +from pyspark.sql.functions import pandas_udf, PandasUDFType +from pyspark.sql import Window + +df = spark.createDataFrame( +[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], +("id", "v")) + +@pandas_udf("double", PandasUDFType.GROUPED_AGG) +def mean_udf(v): +return v.mean() +df.groupby("id").agg(mean_udf(df['v'])).show() +# +---+---+ +# | id|mean_udf(v)| +# +---+---+ +# | 1|1.5| +# | 2|6.0| +# +---+---+ + +w = Window \ +.partitionBy('id') \ +.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) +df.withColumn('mean_v', mean_udf(df['v']).over(w)).show() +# +---++--+ +# | id| v|mean_v| +# +---++--+ +# | 1| 1.0| 1.5| +# | 1| 2.0| 1.5| +# | 2| 3.0| 6.0| +# | 2| 5.0| 6.0| +# | 2|10.0| 6.0| +# +---++--+ +# $example off:grouped_map_pandas_udf$ --- End diff -- Ah good catch, my bad. Let me try building the doc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/21650 Thanks @HyukjinKwon @BryanCutler for the review! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/21650 retest please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21650#discussion_r205872386 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala --- @@ -94,36 +95,52 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] { */ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { - private def hasPythonUDF(e: Expression): Boolean = { + private type EvalType = Int + private type EvalTypeChecker = EvalType => Boolean + + private def hasScalarPythonUDF(e: Expression): Boolean = { e.find(PythonUDF.isScalarPythonUDF).isDefined } private def canEvaluateInPython(e: PythonUDF): Boolean = { e.children match { // single PythonUDF child could be chained and evaluated in Python - case Seq(u: PythonUDF) => canEvaluateInPython(u) + case Seq(u: PythonUDF) => e.evalType == u.evalType && canEvaluateInPython(u) // Python UDF can't be evaluated directly in JVM - case children => !children.exists(hasPythonUDF) + case children => !children.exists(hasScalarPythonUDF) } } - private def collectEvaluatableUDF(expr: Expression): Seq[PythonUDF] = expr match { -case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf) => Seq(udf) -case e => e.children.flatMap(collectEvaluatableUDF) + private def collectEvaluableUDFsFromExpressions(expressions: Seq[Expression]): Seq[PythonUDF] = { +// Eval type checker is set once when we find the first evaluable UDF and its value +// shouldn't change later. +// Used to check if subsequent UDFs are of the same type as the first UDF. (since we can only +// extract UDFs of the same eval type) +var evalTypeChecker: Option[EvalTypeChecker] = None + +def collectEvaluableUDFs(expr: Expression): Seq[PythonUDF] = expr match { + case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf) +&& evalTypeChecker.isEmpty => +evalTypeChecker = Some((otherEvalType: EvalType) => otherEvalType == udf.evalType) +Seq(udf) --- End diff -- @HyukjinKwon In your code this line is `collectEvaluableUDFs (udf)`. I think we should just return `Seq(udf)` to avoid checking the expression twice. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21650#discussion_r205866645 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala --- @@ -94,36 +95,61 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] { */ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { - private def hasPythonUDF(e: Expression): Boolean = { + private case class EvalTypeHolder(private var evalType: Int = -1) { --- End diff -- Ok, I will update the code then. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21650#discussion_r205859891 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala --- @@ -94,36 +95,61 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] { */ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { - private def hasPythonUDF(e: Expression): Boolean = { + private case class EvalTypeHolder(private var evalType: Int = -1) { --- End diff -- I see... You uses a var and nested function definition and var to remove the need of a holder object. IMHO I usually find nested function definition and function that refers to variable outside its definition scope hard to read, but it could be my personal preference. Another thing I like about the current impl the is `EvalTypeHolder` class ensures its value is ever changed once it's set so I think that's more robust. That being said, I am ok with your suggestions too if you insist or @BryanCutler also prefers it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21887: [SPARK-23633][SQL] Update Pandas UDFs section in sql-pro...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/21887 Thanks @HyukjinKwon ! I addressed the comments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/21650 @BryanCutler @HyukjinKwon I updated the PR based on Bryan's suggestion. Please take a look and let me know if you have further comments. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/21650 @HyukjinKwon I think Bryan's imple looks promising. Please let me take a look. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21887: [SPARK-23633][SQL] Update Pandas UDFs section in ...
GitHub user icexelloss opened a pull request: https://github.com/apache/spark/pull/21887 [SPARK-23633][SQL] Update Pandas UDFs section in sql-programming-guide ## What changes were proposed in this pull request? Update Pandas UDFs section in sql-programming-guide. Add section for grouped aggregate pandas UDF. ## How was this patch tested? You can merge this pull request into a Git repository by running: $ git pull https://github.com/icexelloss/spark SPARK-23633-sql-programming-guide Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21887.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21887 commit 2d30b2d00458093a9a4a5941d4f66be2da9c6b97 Author: Li Jin Date: 2018-07-26T19:19:54Z Update Pandas UDF section for grouped aggregate --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21650#discussion_r205448677 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala --- @@ -94,36 +95,94 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] { */ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { - private def hasPythonUDF(e: Expression): Boolean = { + private case class LazyEvalType(var evalType: Int = -1) { + +def isSet: Boolean = evalType >= 0 + +def set(evalType: Int): Unit = { + if (isSet) { +throw new IllegalStateException("Eval type has already been set") + } else { +this.evalType = evalType + } +} + +def get(): Int = { + if (!isSet) { +throw new IllegalStateException("Eval type is not set") + } else { +evalType + } +} + } + + private def hasScalarPythonUDF(e: Expression): Boolean = { e.find(PythonUDF.isScalarPythonUDF).isDefined } - private def canEvaluateInPython(e: PythonUDF): Boolean = { -e.children match { - // single PythonUDF child could be chained and evaluated in Python - case Seq(u: PythonUDF) => canEvaluateInPython(u) - // Python UDF can't be evaluated directly in JVM - case children => !children.exists(hasPythonUDF) + /** + * Check whether a PythonUDF expression can be evaluated in Python. + * + * If the lazy eval type is not set, this method checks for either Batched Python UDF and Scalar + * Pandas UDF. If the lazy eval type is set, this method checks for the expression of the + * specified eval type. + * + * This method will also set the lazy eval type to be the type of the first evaluable expression, + * i.e., if lazy eval type is not set and we find a evaluable Python UDF expression, lazy eval + * type will be set to the eval type of the expression. + * + */ + private def canEvaluateInPython(e: PythonUDF, lazyEvalType: LazyEvalType): Boolean = { --- End diff -- I applied you new code but the test I mentioned above still fails. I think the issue could be when visiting `f2(f1(col('v')))`, firstEvalType is set to Scalar Pandas first and isn't set to Batched SQL later so f1 is not extracted. It's possible that my code is still different than yours somehow. But similar to https://github.com/apache/spark/pull/21650#issuecomment-407951457, I think the state machine of the eval type holder object here is fairly complicated (i.e., what is the expected state of the eval type holder and what's the invariance of the algo) with your suggested implementation and I found myself think pretty hard to prove the state machine is correct in all cases. If we want to go with this implementation, we need to carefully think about it and explain it in code... The lazyEvalType implementation is better IMHO because the state machine is simpler - lazyEvalType is empty until we find the first evaluable UDF and the value doesn't change after we find the first UDF. The first implementation (two pass, immutable state) is probably the simplest in terms of the mental complexity of the algo but is less efficient. I think I am ok with both immutable state or the lazy state. I think @HyukjinKwon prefers the immutable state one. @BryanCutler WDYT? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21650#discussion_r205445392 --- Diff: python/pyspark/sql/tests.py --- @@ -5060,6 +5049,147 @@ def test_type_annotation(self): df = self.spark.range(1).select(pandas_udf(f=_locals['noop'], returnType='bigint')('id')) self.assertEqual(df.first()[0], 0) +def test_mixed_udf(self): +import pandas as pd +from pyspark.sql.functions import col, udf, pandas_udf + +df = self.spark.range(0, 1).toDF('v') + +# Test mixture of multiple UDFs and Pandas UDFs + +@udf('int') +def f1(x): +assert type(x) == int +return x + 1 + +@pandas_udf('int') +def f2(x): +assert type(x) == pd.Series +return x + 10 + +@udf('int') +def f3(x): +assert type(x) == int +return x + 100 + +@pandas_udf('int') +def f4(x): +assert type(x) == pd.Series +return x + 1000 + +# Test mixed udfs in a single projection +df1 = df \ +.withColumn('f1', f1(col('v'))) \ +.withColumn('f2', f2(col('v'))) \ +.withColumn('f3', f3(col('v'))) \ +.withColumn('f4', f4(col('v'))) \ +.withColumn('f2_f1', f2(col('f1'))) \ +.withColumn('f3_f1', f3(col('f1'))) \ +.withColumn('f4_f1', f4(col('f1'))) \ +.withColumn('f3_f2', f3(col('f2'))) \ +.withColumn('f4_f2', f4(col('f2'))) \ +.withColumn('f4_f3', f4(col('f3'))) \ +.withColumn('f3_f2_f1', f3(col('f2_f1'))) \ +.withColumn('f4_f2_f1', f4(col('f2_f1'))) \ +.withColumn('f4_f3_f1', f4(col('f3_f1'))) \ +.withColumn('f4_f3_f2', f4(col('f3_f2'))) \ +.withColumn('f4_f3_f2_f1', f4(col('f3_f2_f1'))) + +# Test mixed udfs in a single expression +df2 = df \ +.withColumn('f1', f1(col('v'))) \ +.withColumn('f2', f2(col('v'))) \ +.withColumn('f3', f3(col('v'))) \ +.withColumn('f4', f4(col('v'))) \ +.withColumn('f2_f1', f2(f1(col('v' \ +.withColumn('f3_f1', f3(f1(col('v' \ +.withColumn('f4_f1', f4(f1(col('v' \ +.withColumn('f3_f2', f3(f2(col('v' \ +.withColumn('f4_f2', f4(f2(col('v' \ +.withColumn('f4_f3', f4(f3(col('v' \ +.withColumn('f3_f2_f1', f3(f2(f1(col('v') \ +.withColumn('f4_f2_f1', f4(f2(f1(col('v') \ +.withColumn('f4_f3_f1', f4(f3(f1(col('v') \ +.withColumn('f4_f3_f2', f4(f3(f2(col('v') \ +.withColumn('f4_f3_f2_f1', f4(f3(f2(f1(col('v')) + +# expected result +df3 = df \ +.withColumn('f1', df['v'] + 1) \ +.withColumn('f2', df['v'] + 10) \ +.withColumn('f3', df['v'] + 100) \ +.withColumn('f4', df['v'] + 1000) \ +.withColumn('f2_f1', df['v'] + 11) \ +.withColumn('f3_f1', df['v'] + 101) \ +.withColumn('f4_f1', df['v'] + 1001) \ +.withColumn('f3_f2', df['v'] + 110) \ +.withColumn('f4_f2', df['v'] + 1010) \ +.withColumn('f4_f3', df['v'] + 1100) \ +.withColumn('f3_f2_f1', df['v'] + 111) \ +.withColumn('f4_f2_f1', df['v'] + 1011) \ +.withColumn('f4_f3_f1', df['v'] + 1101) \ +.withColumn('f4_f3_f2', df['v'] + 1110) \ +.withColumn('f4_f3_f2_f1', df['v'] + ) + +self.assertEquals(df3.collect(), df1.collect()) +self.assertEquals(df3.collect(), df2.collect()) + +def test_mixed_udf_and_sql(self): +import pandas as pd +from pyspark.sql.functions import udf, pandas_udf + +df = self.spark.range(0, 1).toDF('v') + +# Test mixture of UDFs, Pandas UDFs and SQL expression. + +@udf('int') +def f1(x): +assert type(x) == int +return x + 1 + +def f2(x): +return x + 10 + +@pandas_udf('int') +def f3(x): +assert type(x) == pd.Series +return x + 100 + +df1 = df.withColumn('f1', f1(df['v'])) \ +.withColumn('f2', f2(df['v'])) \ +.withColumn('f3', f3(df['v'])) \ +.withColumn('f1_f2', f1(f2(df['v']))) \ +.withColumn('f1_f3', f1(f3(df['v
[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21650#discussion_r205268767 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala --- @@ -94,36 +95,94 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] { */ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { - private def hasPythonUDF(e: Expression): Boolean = { + private case class LazyEvalType(var evalType: Int = -1) { + +def isSet: Boolean = evalType >= 0 + +def set(evalType: Int): Unit = { + if (isSet) { +throw new IllegalStateException("Eval type has already been set") + } else { +this.evalType = evalType + } +} + +def get(): Int = { + if (!isSet) { +throw new IllegalStateException("Eval type is not set") + } else { +evalType + } +} + } + + private def hasScalarPythonUDF(e: Expression): Boolean = { e.find(PythonUDF.isScalarPythonUDF).isDefined } - private def canEvaluateInPython(e: PythonUDF): Boolean = { -e.children match { - // single PythonUDF child could be chained and evaluated in Python - case Seq(u: PythonUDF) => canEvaluateInPython(u) - // Python UDF can't be evaluated directly in JVM - case children => !children.exists(hasPythonUDF) + /** + * Check whether a PythonUDF expression can be evaluated in Python. + * + * If the lazy eval type is not set, this method checks for either Batched Python UDF and Scalar + * Pandas UDF. If the lazy eval type is set, this method checks for the expression of the + * specified eval type. + * + * This method will also set the lazy eval type to be the type of the first evaluable expression, + * i.e., if lazy eval type is not set and we find a evaluable Python UDF expression, lazy eval + * type will be set to the eval type of the expression. + * + */ + private def canEvaluateInPython(e: PythonUDF, lazyEvalType: LazyEvalType): Boolean = { --- End diff -- Yes it's in the most recent commit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21650#discussion_r205262719 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala --- @@ -94,36 +95,94 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] { */ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { - private def hasPythonUDF(e: Expression): Boolean = { + private case class LazyEvalType(var evalType: Int = -1) { + +def isSet: Boolean = evalType >= 0 + +def set(evalType: Int): Unit = { + if (isSet) { +throw new IllegalStateException("Eval type has already been set") + } else { +this.evalType = evalType + } +} + +def get(): Int = { + if (!isSet) { +throw new IllegalStateException("Eval type is not set") + } else { +evalType + } +} + } + + private def hasScalarPythonUDF(e: Expression): Boolean = { e.find(PythonUDF.isScalarPythonUDF).isDefined } - private def canEvaluateInPython(e: PythonUDF): Boolean = { -e.children match { - // single PythonUDF child could be chained and evaluated in Python - case Seq(u: PythonUDF) => canEvaluateInPython(u) - // Python UDF can't be evaluated directly in JVM - case children => !children.exists(hasPythonUDF) + /** + * Check whether a PythonUDF expression can be evaluated in Python. + * + * If the lazy eval type is not set, this method checks for either Batched Python UDF and Scalar + * Pandas UDF. If the lazy eval type is set, this method checks for the expression of the + * specified eval type. + * + * This method will also set the lazy eval type to be the type of the first evaluable expression, + * i.e., if lazy eval type is not set and we find a evaluable Python UDF expression, lazy eval + * type will be set to the eval type of the expression. + * + */ + private def canEvaluateInPython(e: PythonUDF, lazyEvalType: LazyEvalType): Boolean = { --- End diff -- Bryan, I tried to apply your implementation and the simple test also fail: ``` @udf('int') def f1(x): assert type(x) == int return x + 1 @pandas_udf('int') def f2(x): assert type(x) == pd.Series return x + 10 df_chained_1 = df.withColumn('f2_f1', f2(f1(df['v']))) expected_chained_1 = df.withColumn('f2_f1', df['v'] + 11) self.assertEquals(expected_chained_1.collect(), df_chained_1.collect()) ``` Do you mind trying this too? Hopefully I didn't do something silly here.. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org