[GitHub] spark pull request #23248: [SPARK-26293][SQL] Cast exception when having pyt...

2018-12-07 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/23248#discussion_r239925749
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
 ---
@@ -131,8 +131,20 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] 
with PredicateHelper {
 expressions.flatMap(collectEvaluableUDFs)
   }
 
-  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
-case plan: LogicalPlan => extract(plan)
+  def apply(plan: LogicalPlan): LogicalPlan = plan match {
+// SPARK-26293: A subquery will be rewritten into join later, and will 
go through this rule
+// eventually. Here we skip subquery, as Python UDF only needs to be 
extracted once.
+case _: Subquery => plan
--- End diff --

I see. If it's common to skip Subquery in other rules, I guess it's ok to 
put it in here as well. But it would definitely be helpful to establish some 
kind of guidance, maybe sth like "All optimizer rule should skip Subquery 
because OptimizeSubqueries will execute them anyway"?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...

2018-12-07 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r239922856
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ---
@@ -144,24 +282,107 @@ case class WindowInPandasExec(
 queue.close()
   }
 
-  val inputProj = UnsafeProjection.create(allInputs, child.output)
-  val pythonInput = grouped.map { case (_, rows) =>
-rows.map { row =>
-  queue.add(row.asInstanceOf[UnsafeRow])
-  inputProj(row)
+  val stream = iter.map { row =>
+queue.add(row.asInstanceOf[UnsafeRow])
+row
+  }
+
+  val pythonInput = new Iterator[Iterator[UnsafeRow]] {
+
+// Manage the stream and the grouping.
+var nextRow: UnsafeRow = null
+var nextGroup: UnsafeRow = null
+var nextRowAvailable: Boolean = false
+private[this] def fetchNextRow() {
+  nextRowAvailable = stream.hasNext
+  if (nextRowAvailable) {
+nextRow = stream.next().asInstanceOf[UnsafeRow]
+nextGroup = grouping(nextRow)
+  } else {
+nextRow = null
+nextGroup = null
+  }
+}
+fetchNextRow()
+
+// Manage the current partition.
+val buffer: ExternalAppendOnlyUnsafeRowArray =
+  new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, 
spillThreshold)
+var bufferIterator: Iterator[UnsafeRow] = _
+
+val indexRow = new 
SpecificInternalRow(Array.fill(numBoundIndices)(IntegerType))
+
+val frames = factories.map(_(indexRow))
+
+private[this] def fetchNextPartition() {
+  // Collect all the rows in the current partition.
+  // Before we start to fetch new input rows, make a copy of 
nextGroup.
+  val currentGroup = nextGroup.copy()
+
+  // clear last partition
+  buffer.clear()
+
+  while (nextRowAvailable && nextGroup == currentGroup) {
--- End diff --

I did fix the original `WindowExec` as well but of course we can fix it in 
a separate PR. I will revert my latest change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...

2018-12-06 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r239587375
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ---
@@ -144,24 +282,107 @@ case class WindowInPandasExec(
 queue.close()
   }
 
-  val inputProj = UnsafeProjection.create(allInputs, child.output)
-  val pythonInput = grouped.map { case (_, rows) =>
-rows.map { row =>
-  queue.add(row.asInstanceOf[UnsafeRow])
-  inputProj(row)
+  val stream = iter.map { row =>
+queue.add(row.asInstanceOf[UnsafeRow])
+row
+  }
+
+  val pythonInput = new Iterator[Iterator[UnsafeRow]] {
+
+// Manage the stream and the grouping.
+var nextRow: UnsafeRow = null
+var nextGroup: UnsafeRow = null
+var nextRowAvailable: Boolean = false
+private[this] def fetchNextRow() {
+  nextRowAvailable = stream.hasNext
+  if (nextRowAvailable) {
+nextRow = stream.next().asInstanceOf[UnsafeRow]
+nextGroup = grouping(nextRow)
+  } else {
+nextRow = null
+nextGroup = null
+  }
+}
+fetchNextRow()
+
+// Manage the current partition.
+val buffer: ExternalAppendOnlyUnsafeRowArray =
+  new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, 
spillThreshold)
+var bufferIterator: Iterator[UnsafeRow] = _
+
+val indexRow = new 
SpecificInternalRow(Array.fill(numBoundIndices)(IntegerType))
+
+val frames = factories.map(_(indexRow))
+
+private[this] def fetchNextPartition() {
+  // Collect all the rows in the current partition.
+  // Before we start to fetch new input rows, make a copy of 
nextGroup.
+  val currentGroup = nextGroup.copy()
+
+  // clear last partition
+  buffer.clear()
+
+  while (nextRowAvailable && nextGroup == currentGroup) {
--- End diff --

Good catch. @ueshin Do you mind double checking what I am doing now is 
correct?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...

2018-12-06 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r239587065
  
--- Diff: python/pyspark/sql/tests/test_pandas_udf_window.py ---
@@ -87,8 +96,34 @@ def ordered_window(self):
 def unpartitioned_window(self):
 return Window.partitionBy()
 
+@property
+def sliding_row_window(self):
+return Window.partitionBy('id').orderBy('v').rowsBetween(-2, 1)
+
+@property
+def sliding_range_window(self):
+return Window.partitionBy('id').orderBy('v').rangeBetween(-2, 4)
+
+@property
+def growing_row_window(self):
+return 
Window.partitionBy('id').orderBy('v').rowsBetween(Window.unboundedPreceding, 3)
+
+@property
+def growing_range_window(self):
+return Window.partitionBy('id').orderBy('v') \
+.rangeBetween(Window.unboundedPreceding, 4)
+
+@property
+def shrinking_row_window(self):
+return Window.partitionBy('id').orderBy('v').rowsBetween(-2, 
Window.unboundedFollowing)
+
+@property
+def shrinking_range_window(self):
+return Window.partitionBy('id').orderBy('v') \
+.rangeBetween(-3, Window.unboundedFollowing)
+
 def test_simple(self):
-from pyspark.sql.functions import mean
+from pyspark.sql.functions import pandas_udf, PandasUDFType, 
percent_rank, mean, max
--- End diff --

Reverted


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...

2018-12-06 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r239587136
  
--- Diff: python/pyspark/sql/tests/test_pandas_udf_window.py ---
@@ -245,11 +278,101 @@ def test_invalid_args(self):
 foo_udf = pandas_udf(lambda x: x, 'v double', 
PandasUDFType.GROUPED_MAP)
 df.withColumn('v2', foo_udf(df['v']).over(w))
 
-with QuietTest(self.sc):
-with self.assertRaisesRegexp(
-AnalysisException,
-'.*Only unbounded window frame is supported.*'):
-df.withColumn('mean_v', mean_udf(df['v']).over(ow))
+def test_bounded_simple(self):
+from pyspark.sql.functions import mean, max, min, count
+
+df = self.data
+w1 = self.sliding_row_window
+w2 = self.shrinking_range_window
+
+plus_one = self.python_plus_one
+count_udf = self.pandas_agg_count_udf
+mean_udf = self.pandas_agg_mean_udf
+max_udf = self.pandas_agg_max_udf
+min_udf = self.pandas_agg_min_udf
+
+result1 = df.withColumn('mean_v', 
mean_udf(plus_one(df['v'])).over(w1)) \
+.withColumn('count_v', count_udf(df['v']).over(w2)) \
+.withColumn('max_v',  max_udf(df['v']).over(w2)) \
+.withColumn('min_v', min_udf(df['v']).over(w1))
+
+expected1 = df.withColumn('mean_v', 
mean(plus_one(df['v'])).over(w1)) \
+.withColumn('count_v', count(df['v']).over(w2)) \
+.withColumn('max_v', max(df['v']).over(w2)) \
+.withColumn('min_v', min(df['v']).over(w1))
+
+self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
+
+def test_growing_window(self):
+from pyspark.sql.functions import mean
+
+df = self.data
+w1 = self.growing_row_window
+w2 = self.growing_range_window
+
+mean_udf = self.pandas_agg_mean_udf
+
+result1 = df.withColumn('m1', mean_udf(df['v']).over(w1)) \
+.withColumn('m2', mean_udf(df['v']).over(w2))
+
+expected1 = df.withColumn('m1', mean(df['v']).over(w1)) \
+.withColumn('m2', mean(df['v']).over(w2))
+
+self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
+
+def test_sliding_window(self):
+from pyspark.sql.functions import mean
+
+df = self.data
+w1 = self.sliding_row_window
+w2 = self.sliding_range_window
+
+mean_udf = self.pandas_agg_mean_udf
+
+result1 = df.withColumn('m1', mean_udf(df['v']).over(w1)) \
+.withColumn('m2', mean_udf(df['v']).over(w2))
+
+expected1 = df.withColumn('m1', mean(df['v']).over(w1)) \
+.withColumn('m2', mean(df['v']).over(w2))
+
+self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
+
+def test_shrinking_window(self):
+from pyspark.sql.functions import mean
+
+df = self.data
+w1 = self.shrinking_row_window
+w2 = self.shrinking_range_window
+
+mean_udf = self.pandas_agg_mean_udf
+
+result1 = df.withColumn('m1', mean_udf(df['v']).over(w1)) \
+.withColumn('m2', mean_udf(df['v']).over(w2))
+
+expected1 = df.withColumn('m1', mean(df['v']).over(w1)) \
+.withColumn('m2', mean(df['v']).over(w2))
+
+self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
+
+def test_bounded_mixed(self):
+from pyspark.sql.functions import mean, max, min, count
--- End diff --

Yes - removed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...

2018-12-06 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r239587089
  
--- Diff: python/pyspark/sql/tests/test_pandas_udf_window.py ---
@@ -231,12 +266,10 @@ def test_array_type(self):
 self.assertEquals(result1.first()['v2'], [1.0, 2.0])
 
 def test_invalid_args(self):
-from pyspark.sql.functions import pandas_udf, PandasUDFType
+from pyspark.sql.functions import mean, pandas_udf, PandasUDFType
--- End diff --

Reverted


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...

2018-12-06 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r239587020
  
--- Diff: python/pyspark/sql/tests/test_pandas_udf_window.py ---
@@ -44,9 +44,18 @@ def python_plus_one(self):
 
 @property
 def pandas_scalar_time_two(self):
-from pyspark.sql.functions import pandas_udf
+from pyspark.sql.functions import pandas_udf, PandasUDFType
--- End diff --

Sorry. Reverted


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23248: [SPARK-26293][SQL] Cast exception when having pyt...

2018-12-06 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/23248#discussion_r239565253
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
 ---
@@ -131,8 +131,20 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] 
with PredicateHelper {
 expressions.flatMap(collectEvaluableUDFs)
   }
 
-  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
-case plan: LogicalPlan => extract(plan)
+  def apply(plan: LogicalPlan): LogicalPlan = plan match {
+// SPARK-26293: A subquery will be rewritten into join later, and will 
go through this rule
+// eventually. Here we skip subquery, as Python UDF only needs to be 
extracted once.
+case _: Subquery => plan
--- End diff --

Personally I found it a bit confusing when two seeming unrelated things are 
put together (Subquery and ExtractPythonUDFs).

I wonder if it's sufficient to make ExtractPythonUDFs idempotent?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregati...

2018-12-05 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/22305
  
Hi @BryanCutler @HyukjinKwon @ueshin , mind taking another look? I think 
this is in a good shape. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...

2018-11-21 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r235417425
  
--- Diff: python/pyspark/worker.py ---
@@ -154,6 +154,47 @@ def wrapped(*series):
 return lambda *a: (wrapped(*a), arrow_return_type)
 
 
+def wrap_bounded_window_agg_pandas_udf(f, return_type):
+arrow_return_type = to_arrow_type(return_type)
+
+def wrapped(begin_index, end_index, *series):
+import numpy as np
+import pandas as pd
+result = []
+for i in range(0, len(begin_index)):
+begin = begin_index[i]
+end = end_index[i]
+range_index = np.arange(begin, end)
+# Note: Create a slice from a series is actually pretty 
expensive to
+#   do for each window. However, there is no way to 
reduce/eliminate
+#   the cost of creating sub series here AFAIK.
+# TODO: s.take might be the best way to create sub series
+series_slices = [s.take(range_index) for s in series]
+result.append(f(*series_slices))
+return pd.Series(result)
+
+return lambda *a: (wrapped(*a), arrow_return_type)
+
+
+def wrap_bounded_window_agg_pandas_udf_np(f, return_type):
--- End diff --

This is removed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregati...

2018-11-20 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/22305
  
@BryanCutler @HyukjinKwon @ueshin I have addressed all the comments so far. 
Could you please take another look? Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...

2018-11-20 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r235182927
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -63,7 +65,7 @@ private[spark] object PythonEvalType {
  */
 private[spark] abstract class BasePythonRunner[IN, OUT](
 funcs: Seq[ChainedPythonFunctions],
-evalType: Int,
+evalTypes: Seq[Int],
--- End diff --

I have reverted the evalType change and use a runner config instead.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...

2018-11-19 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r234790479
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ---
@@ -27,17 +27,62 @@ import 
org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, 
SparkPlan}
 import org.apache.spark.sql.execution.arrow.ArrowUtils
-import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.sql.execution.window._
+import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
+/**
+ * This class calculates and outputs windowed aggregates over the rows in 
a single partition.
+ *
+ * It is very similar to [[WindowExec]] and has similar logic. The main 
difference is that this
+ * node doesn't not compute any window aggregation values. Instead, it 
computes the lower and
+ * upper bound for each window (i.e. window bounds) and pass the data and 
indices to python work
+ * to do the actual window aggregation.
+ *
+ * It currently materialize all data associated with the same partition 
key and pass them to
--- End diff --

Thanks! Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...

2018-11-19 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r234790633
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ---
@@ -73,68 +118,151 @@ case class WindowInPandasExec(
   }
 
   /**
-   * Create the resulting projection.
-   *
-   * This method uses Code Generation. It can only be used on the executor 
side.
+   * Get all relevant helper functions and data structures for window 
bounds
*
-   * @param expressions unbound ordered function expressions.
-   * @return the final resulting projection.
+   * This function returns:
+   * (1) Total number of window bound indices in the python input row
+   * (2) Function from frame index to its lower bound column index in the 
python input row
+   * (3) Function from frame index to its upper bound column index in the 
python input row
+   * (4) Function indicates whether a frame requires window bound indices
+   * (5) Function from frame index to its eval type
*/
-  private[this] def createResultProjection(expressions: Seq[Expression]): 
UnsafeProjection = {
-val references = expressions.zipWithIndex.map { case (e, i) =>
-  // Results of window expressions will be on the right side of 
child's output
-  BoundReference(child.output.size + i, e.dataType, e.nullable)
+  private def computeWindowBoundHelpers(
+  factories: Seq[InternalRow => WindowFunctionFrame]
+  ): (Int, Int => Int, Int => Int, Int => Boolean, Int => Int) = {
--- End diff --

Sounds good. I added a type for that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...

2018-11-19 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r234790364
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -89,6 +89,7 @@
 from pyspark.sql.types import _merge_type
 from pyspark.tests import QuietTest, ReusedPySparkTestCase, 
PySparkTestCase, SparkSubmitTests
 from pyspark.sql.functions import UserDefinedFunction, sha2, lit
+import pyspark.sql.functions as F
--- End diff --

Thanks! Removed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...

2018-11-19 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r234790403
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -7064,12 +7098,104 @@ def test_invalid_args(self):
 foo_udf = pandas_udf(lambda x: x, 'v double', 
PandasUDFType.GROUPED_MAP)
 df.withColumn('v2', foo_udf(df['v']).over(w))
 
-with QuietTest(self.sc):
-with self.assertRaisesRegexp(
-AnalysisException,
-'.*Only unbounded window frame is supported.*'):
-df.withColumn('mean_v', mean_udf(df['v']).over(ow))
+def test_bounded_simple(self):
+from pyspark.sql.functions import mean, max, min, count
+
+df = self.data
+w1 = self.sliding_row_window
+w2 = self.shrinking_range_window
+
+plus_one = self.python_plus_one
+count_udf = self.pandas_agg_count_udf
+mean_udf = self.pandas_agg_mean_udf
+max_udf = self.pandas_agg_max_udf
+min_udf = self.pandas_agg_min_udf
+
+result1 = df.withColumn('mean_v', 
mean_udf(plus_one(df['v'])).over(w1))\
+.withColumn('count_v', count_udf(df['v']).over(w2)) \
+.withColumn('max_v',  max_udf(df['v']).over(w2)) \
+.withColumn('min_v', min_udf(df['v']).over(w1)) \
+
+expected1 = df.withColumn('mean_v', 
mean(plus_one(df['v'])).over(w1))\
+.withColumn('count_v', count(df['v']).over(w2)) \
+.withColumn('max_v', max(df['v']).over(w2)) \
+.withColumn('min_v', min(df['v']).over(w1)) \
+
+self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
+
+def test_growing_window(self):
+from pyspark.sql.functions import mean
+
+df = self.data
+w1 = self.growing_row_window
+w2 = self.growing_range_window
 
+mean_udf = self.pandas_agg_mean_udf
+
+result1 = df.withColumn('m1', mean_udf(df['v']).over(w1)) \
+.withColumn('m2', mean_udf(df['v']).over(w2))
+
+expected1 = df.withColumn('m1', mean(df['v']).over(w1)) \
+.withColumn('m2', mean(df['v']).over(w2))
+
+self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
+
+def test_sliding_window(self):
+from pyspark.sql.functions import mean
+
+df = self.data
+w1 = self.sliding_row_window
+w2 = self.sliding_range_window
+
+mean_udf = self.pandas_agg_mean_udf
+
+result1 = df.withColumn('m1', mean_udf(df['v']).over(w1)) \
+.withColumn('m2', mean_udf(df['v']).over(w2))
+
+expected1 = df.withColumn('m1', mean(df['v']).over(w1)) \
+.withColumn('m2', mean(df['v']).over(w2))
+
+result1.show()
+expected1.show()
--- End diff --

Thanks! Removed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...

2018-11-09 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r232393476
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -6323,6 +6333,33 @@ def ordered_window(self):
 def unpartitioned_window(self):
 return Window.partitionBy()
 
+@property
+def sliding_row_window(self):
+return Window.partitionBy('id').orderBy('v').rowsBetween(-2, 1)
+
+@property
+def sliding_range_window(self):
+from pyspark.sql.functions import lit
+return 
Window.partitionBy('id').orderBy('v').rangeBetween(lit(-2.0), lit(4.0))
--- End diff --

Thanks! I fixed this to use the rangeBetween(Long, Long) API


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...

2018-11-09 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r232393452
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -6323,6 +6333,33 @@ def ordered_window(self):
 def unpartitioned_window(self):
 return Window.partitionBy()
 
+@property
+def sliding_row_window(self):
+return Window.partitionBy('id').orderBy('v').rowsBetween(-2, 1)
+
+@property
+def sliding_range_window(self):
+from pyspark.sql.functions import lit
+return 
Window.partitionBy('id').orderBy('v').rangeBetween(lit(-2.0), lit(4.0))
+
+@property
+def growing_row_window(self):
+return 
Window.partitionBy('id').orderBy('v').rowsBetween(Window.unboundedPreceding, 3)
+
+@property
+def growing_range_window(self):
+return Window.partitionBy('id').orderBy('v') \
+.rangeBetween(F.unboundedPreceding(), F.lit(4.0))
--- End diff --

Thanks! I fixed this to use the rangeBetween(Long, Long) API


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...

2018-11-09 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r232393335
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ---
@@ -27,17 +27,62 @@ import 
org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, 
SparkPlan}
 import org.apache.spark.sql.execution.arrow.ArrowUtils
-import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.sql.execution.window._
+import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
+/**
+ * This class calculates and outputs windowed aggregates over the rows in 
a single partition.
+ *
+ * It is very similar to [[WindowExec]] and has similar logic. The main 
difference is that this
+ * node doesn't not compute any window aggregation values. Instead, it 
computes the lower and
+ * upper bound for each window (i.e. window bounds) and pass the data and 
indices to python work
+ * to do the actual window aggregation.
+ *
+ * It currently materialize all data associated with the same partition 
key and pass them to
+ * Python. This is not strictly necessary for sliding windows and can be 
improved (by slicing
+ * data into overlapping small chunks and stitch them together).
+ *
+ * This class groups window expressions by their window boundaries so that 
window expressions
+ * with the same window boundaries can share the same window bounds. The 
window bounds are
+ * prepended to the data passed to the python worker.
+ *
+ * For example, if we have:
+ * avg(v) over specifiedwindowframe(RowFrame, -5, 5),
+ * avg(v) over specifiedwindowframe(RowFrame, UnboundedPreceding, 
UnboundedFollowing),
+ * avg(v) over specifiedwindowframe(RowFrame, -3, 3),
+ * max(v) over specifiedwindowframe(RowFrame, -3, 3)
+ *
+ * The python input will look like:
+ * (lower_bound_w1, upper_bound_w1, lower_bound_w3, upper_bound_w3, v)
+ *
+ * where w1 is specifiedwindowframe(RowFrame, -5, 5)
+ *   w2 is specifiedwindowframe(RowFrame, UnboundedPreceding, 
UnboundedFollowing)
+ *   w3 is specifiedwindowframe(RowFrame, -3, 3)
+ *
+ * Note that w2 doesn't have bound indices in the python input because its 
unbounded window
+ * so it's bound indices will always be the same.
+ *
+ * Unbounded window also have a different eval type, because:
+ * (1) It doesn't have bound indices as input
+ * (2) The udf only needs to be evaluated once the in python worker 
(because the udf is
+ * deterministic and window bounds are the same for all windows)
+ *
+ * The logic to compute window bounds is delegated to 
[[WindowFunctionFrame]] and shared with
+ * [[WindowExec]]
+ *
+ * Note this doesn't support partial aggregation and all aggregation is 
computed from the entire
+ * window.
+ */
 case class WindowInPandasExec(
 windowExpression: Seq[NamedExpression],
 partitionSpec: Seq[Expression],
 orderSpec: Seq[SortOrder],
-child: SparkPlan) extends UnaryExecNode {
+child: SparkPlan
+) extends WindowExecBase(windowExpression, partitionSpec, orderSpec, 
child) {
--- End diff --

Done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...

2018-11-09 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r232393305
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ---
@@ -73,68 +118,147 @@ case class WindowInPandasExec(
   }
 
   /**
-   * Create the resulting projection.
-   *
-   * This method uses Code Generation. It can only be used on the executor 
side.
+   * Helper function to get all relevant helper functions and data 
structures for window bounds
*
-   * @param expressions unbound ordered function expressions.
-   * @return the final resulting projection.
+   * This function returns:
+   * (1) Total number of window bound indices in the python input row
+   * (2) Function from frame index to its lower bound column index in the 
python input row
+   * (3) Function from frame index to its upper bound column index in the 
python input row
+   * (4) Function that returns a frame requires window bound indices in 
the python input row
+   * (unbounded window doesn't need it)
+   * (5) Function from frame index to its eval type
*/
-  private[this] def createResultProjection(expressions: Seq[Expression]): 
UnsafeProjection = {
-val references = expressions.zipWithIndex.map { case (e, i) =>
-  // Results of window expressions will be on the right side of 
child's output
-  BoundReference(child.output.size + i, e.dataType, e.nullable)
+  private def computeWindowBoundHelpers(
+  factories: Seq[InternalRow => WindowFunctionFrame]
+  ): (Int, Int => Int, Int => Int, Int => Boolean, Int => Int) = {
+val dummyRow = new SpecificInternalRow()
+val functionFrames = factories.map(_(dummyRow))
+
+val evalTypes = functionFrames.map {
+  case _: UnboundedWindowFunctionFrame => 
PythonEvalType.SQL_UNBOUNDED_WINDOW_AGG_PANDAS_UDF
+  case _ => PythonEvalType.SQL_BOUNDED_WINDOW_AGG_PANDAS_UDF
+}
+
+val requiredIndices = functionFrames.map {
+  case _: UnboundedWindowFunctionFrame => 0
+  case _ => 2
 }
-val unboundToRefMap = expressions.zip(references).toMap
-val patchedWindowExpression = 
windowExpression.map(_.transform(unboundToRefMap))
-UnsafeProjection.create(
-  child.output ++ patchedWindowExpression,
-  child.output)
+
+val upperBoundIndices = requiredIndices.scan(0)(_ + _).tail
+
+val boundIndices = (requiredIndices zip upperBoundIndices).map {case 
(num, upperBoundIndex) =>
--- End diff --

Done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...

2018-11-09 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r232393187
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ---
@@ -73,68 +118,147 @@ case class WindowInPandasExec(
   }
 
   /**
-   * Create the resulting projection.
-   *
-   * This method uses Code Generation. It can only be used on the executor 
side.
+   * Helper function to get all relevant helper functions and data 
structures for window bounds
*
-   * @param expressions unbound ordered function expressions.
-   * @return the final resulting projection.
+   * This function returns:
+   * (1) Total number of window bound indices in the python input row
+   * (2) Function from frame index to its lower bound column index in the 
python input row
+   * (3) Function from frame index to its upper bound column index in the 
python input row
+   * (4) Function that returns a frame requires window bound indices in 
the python input row
+   * (unbounded window doesn't need it)
+   * (5) Function from frame index to its eval type
*/
-  private[this] def createResultProjection(expressions: Seq[Expression]): 
UnsafeProjection = {
-val references = expressions.zipWithIndex.map { case (e, i) =>
-  // Results of window expressions will be on the right side of 
child's output
-  BoundReference(child.output.size + i, e.dataType, e.nullable)
+  private def computeWindowBoundHelpers(
+  factories: Seq[InternalRow => WindowFunctionFrame]
+  ): (Int, Int => Int, Int => Int, Int => Boolean, Int => Int) = {
+val dummyRow = new SpecificInternalRow()
+val functionFrames = factories.map(_(dummyRow))
+
+val evalTypes = functionFrames.map {
+  case _: UnboundedWindowFunctionFrame => 
PythonEvalType.SQL_UNBOUNDED_WINDOW_AGG_PANDAS_UDF
+  case _ => PythonEvalType.SQL_BOUNDED_WINDOW_AGG_PANDAS_UDF
+}
+
+val requiredIndices = functionFrames.map {
+  case _: UnboundedWindowFunctionFrame => 0
+  case _ => 2
 }
-val unboundToRefMap = expressions.zip(references).toMap
-val patchedWindowExpression = 
windowExpression.map(_.transform(unboundToRefMap))
-UnsafeProjection.create(
-  child.output ++ patchedWindowExpression,
-  child.output)
+
+val upperBoundIndices = requiredIndices.scan(0)(_ + _).tail
+
+val boundIndices = (requiredIndices zip upperBoundIndices).map {case 
(num, upperBoundIndex) =>
+if (num == 0) {
+  // Sentinel values for unbounded window
+  (-1, -1)
+} else {
+  (upperBoundIndex - 2, upperBoundIndex - 1)
+}
+}
+
+def lowerBoundIndex(frameIndex: Int) = boundIndices(frameIndex)._1
+def upperBoundIndex(frameIndex: Int) = boundIndices(frameIndex)._2
+def frameEvalType(frameIndex: Int) = evalTypes(frameIndex)
+def frameRequireIndex(frameIndex: Int) =
+  evalTypes(frameIndex) == 
PythonEvalType.SQL_BOUNDED_WINDOW_AGG_PANDAS_UDF
+
+(requiredIndices.sum, lowerBoundIndex, upperBoundIndex, 
frameRequireIndex, frameEvalType)
   }
 
   protected override def doExecute(): RDD[InternalRow] = {
-val inputRDD = child.execute()
+// Unwrap the expressions and factories from the map.
+val expressionsWithFrameIndex =
+  windowFrameExpressionFactoryPairs.map(_._1).zipWithIndex.flatMap {
+case (buffer, frameIndex) => buffer.map( expr => (expr, 
frameIndex))
+  }
+
+val expressions = expressionsWithFrameIndex.map(_._1)
+val expressionIndexToFrameIndex =
+  expressionsWithFrameIndex.map(_._2).zipWithIndex.map(_.swap).toMap
+
+val factories = windowFrameExpressionFactoryPairs.map(_._2).toArray
+
+val (numBoundIndices, lowerBoundIndex, upperBoundIndex, 
frameRequireIndex, frameEvalType) =
+  computeWindowBoundHelpers(factories)
+
+val funcEvalTypes = expressions.indices.map(
+  i => frameEvalType(expressionIndexToFrameIndex(i)))
+
+val numFrames = factories.length
+
+val inMemoryThreshold = 
sqlContext.conf.windowExecBufferInMemoryThreshold
+val spillThreshold = sqlContext.conf.windowExecBufferSpillThreshold
--- End diff --

Thanks! Done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...

2018-11-09 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r232388369
  
--- Diff: python/pyspark/worker.py ---
@@ -154,6 +154,47 @@ def wrapped(*series):
 return lambda *a: (wrapped(*a), arrow_return_type)
 
 
+def wrap_bounded_window_agg_pandas_udf(f, return_type):
+arrow_return_type = to_arrow_type(return_type)
+
+def wrapped(begin_index, end_index, *series):
+import numpy as np
+import pandas as pd
+result = []
+for i in range(0, len(begin_index)):
+begin = begin_index[i]
+end = end_index[i]
+range_index = np.arange(begin, end)
+# Note: Create a slice from a series is actually pretty 
expensive to
+#   do for each window. However, there is no way to 
reduce/eliminate
+#   the cost of creating sub series here AFAIK.
+# TODO: s.take might be the best way to create sub series
+series_slices = [s.take(range_index) for s in series]
--- End diff --

Good catch, `s[begin:end]` is actually faster


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...

2018-11-08 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r232084279
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ---
@@ -73,68 +118,147 @@ case class WindowInPandasExec(
   }
 
   /**
-   * Create the resulting projection.
-   *
-   * This method uses Code Generation. It can only be used on the executor 
side.
+   * Helper function to get all relevant helper functions and data 
structures for window bounds
*
-   * @param expressions unbound ordered function expressions.
-   * @return the final resulting projection.
+   * This function returns:
+   * (1) Total number of window bound indices in the python input row
+   * (2) Function from frame index to its lower bound column index in the 
python input row
+   * (3) Function from frame index to its upper bound column index in the 
python input row
+   * (4) Function that returns a frame requires window bound indices in 
the python input row
+   * (unbounded window doesn't need it)
+   * (5) Function from frame index to its eval type
*/
-  private[this] def createResultProjection(expressions: Seq[Expression]): 
UnsafeProjection = {
-val references = expressions.zipWithIndex.map { case (e, i) =>
-  // Results of window expressions will be on the right side of 
child's output
-  BoundReference(child.output.size + i, e.dataType, e.nullable)
+  private def computeWindowBoundHelpers(
+  factories: Seq[InternalRow => WindowFunctionFrame]
+  ): (Int, Int => Int, Int => Int, Int => Boolean, Int => Int) = {
+val dummyRow = new SpecificInternalRow()
--- End diff --

Yes, this is for figuring out the types of each `WindowFunctionFrame`.  
These function frames are created temporary and thrown away when this function 
returns so it's not great... However in order to create the proper function 
frames we would need to know the total number window indices, so it's a bit of 
chicken and egg problem here... I don't see an easy way :(


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregati...

2018-11-06 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/22305
  
No worries. Thank you @HyukjinKwon and @ueshin 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregati...

2018-11-05 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/22305
  
Hey @gatorsmile it has been quite a while with no review progress on this. 
@BryanCutler has some initial comments but I want to get more people's feedback 
before addressing those. Since now 2.4 is out, I figured you guys have some 
more bandwidth. Do you think we can get some reviewers' attention on this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...

2018-10-23 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r227591746
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -6323,6 +6333,33 @@ def ordered_window(self):
 def unpartitioned_window(self):
 return Window.partitionBy()
 
+@property
+def sliding_row_window(self):
--- End diff --

I think they could be variables. I don't particular care one way or the 
other.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...

2018-10-23 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r227591518
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -6481,12 +6516,116 @@ def test_invalid_args(self):
 foo_udf = pandas_udf(lambda x: x, 'v double', 
PandasUDFType.GROUPED_MAP)
 df.withColumn('v2', foo_udf(df['v']).over(w))
 
-with QuietTest(self.sc):
-with self.assertRaisesRegexp(
-AnalysisException,
-'.*Only unbounded window frame is supported.*'):
-df.withColumn('mean_v', mean_udf(df['v']).over(ow))
+def test_bounded_simple(self):
+from pyspark.sql.functions import mean, max, min, count
+
+df = self.data
+w1 = self.sliding_row_window
+w2 = self.shrinking_range_window
+
+plus_one = self.python_plus_one
+count_udf = self.pandas_agg_count_udf
+mean_udf = self.pandas_agg_mean_udf
+max_udf = self.pandas_agg_max_udf
+min_udf = self.pandas_agg_min_udf
+
+result1 = df.withColumn('mean_v', 
mean_udf(plus_one(df['v'])).over(w1))\
+.withColumn('count_v', count_udf(df['v']).over(w2)) \
+.withColumn('max_v',  max_udf(df['v']).over(w2)) \
+.withColumn('min_v', min_udf(df['v']).over(w1)) \
+
+expected1 = df.withColumn('mean_v', 
mean(plus_one(df['v'])).over(w1))\
+.withColumn('count_v', count(df['v']).over(w2)) \
+.withColumn('max_v', max(df['v']).over(w2)) \
+.withColumn('min_v', min(df['v']).over(w1)) \
+
+result1.explain(True)
+expected1.explain(True)
+
+result1.show()
+expected1.show()
--- End diff --

Ah, missed those, will do, thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...

2018-10-23 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r227591428
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -63,7 +65,7 @@ private[spark] object PythonEvalType {
  */
 private[spark] abstract class BasePythonRunner[IN, OUT](
 funcs: Seq[ChainedPythonFunctions],
-evalType: Int,
+evalTypes: Seq[Int],
--- End diff --

> So couldn't you just send an index that encompasses the entire range for 
unbounded

This is actually what I first did. However, I think this would require 
sending more data than necessary for the unbounded case. In worst case it will 
be 3x the number of columns (begin_index, end_index, data) comparing to just 
one column (data). 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregati...

2018-10-23 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/22305
  
@felixcheung I am waiting for some in-depth review. @ueshin do you have 
some time to review this in the near future? Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...

2018-10-11 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r224548624
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -63,7 +65,7 @@ private[spark] object PythonEvalType {
  */
 private[spark] abstract class BasePythonRunner[IN, OUT](
 funcs: Seq[ChainedPythonFunctions],
-evalType: Int,
+evalTypes: Seq[Int],
--- End diff --

I see your point - I can see this being used for other things too, for 
example, numpy variant vectorized UDFs, or Window transform UDFs for unbounded 
window (n -> n mapping for unbounded window, such as rank). I choose this 
approach because of the flexibility.

For this particular case, it is possible to distinguish between 
bounded/unbounded, for example, maybe sending something in the arg offsets or 
sth like that, but this would be using arg offsets for sth else...




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...

2018-10-09 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r223762966
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -63,7 +65,7 @@ private[spark] object PythonEvalType {
  */
 private[spark] abstract class BasePythonRunner[IN, OUT](
 funcs: Seq[ChainedPythonFunctions],
-evalType: Int,
+evalTypes: Seq[Int],
--- End diff --

There might be other way to do this. I like extending a single eval type to 
a seq of eval types because it seems pretty simple and flexible to me. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...

2018-10-09 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r223761447
  
--- Diff: python/pyspark/worker.py ---
@@ -154,6 +154,47 @@ def wrapped(*series):
 return lambda *a: (wrapped(*a), arrow_return_type)
 
 
+def wrap_bounded_window_agg_pandas_udf(f, return_type):
+arrow_return_type = to_arrow_type(return_type)
+
+def wrapped(begin_index, end_index, *series):
+import numpy as np
+import pandas as pd
+result = []
+for i in range(0, len(begin_index)):
+begin = begin_index[i]
+end = end_index[i]
+range_index = np.arange(begin, end)
+# Note: Create a slice from a series is actually pretty 
expensive to
+#   do for each window. However, there is no way to 
reduce/eliminate
+#   the cost of creating sub series here AFAIK.
+# TODO: s.take might be the best way to create sub series
+series_slices = [s.take(range_index) for s in series]
--- End diff --

I forgot the reason that I choose this take over iloc. I will do a bit more 
investigation and report back.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...

2018-10-09 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r223754106
  
--- Diff: python/pyspark/worker.py ---
@@ -154,6 +154,47 @@ def wrapped(*series):
 return lambda *a: (wrapped(*a), arrow_return_type)
 
 
+def wrap_bounded_window_agg_pandas_udf(f, return_type):
+arrow_return_type = to_arrow_type(return_type)
+
+def wrapped(begin_index, end_index, *series):
+import numpy as np
+import pandas as pd
+result = []
+for i in range(0, len(begin_index)):
+begin = begin_index[i]
+end = end_index[i]
+range_index = np.arange(begin, end)
+# Note: Create a slice from a series is actually pretty 
expensive to
+#   do for each window. However, there is no way to 
reduce/eliminate
+#   the cost of creating sub series here AFAIK.
+# TODO: s.take might be the best way to create sub series
+series_slices = [s.take(range_index) for s in series]
+result.append(f(*series_slices))
+return pd.Series(result)
+
+return lambda *a: (wrapped(*a), arrow_return_type)
+
+
+def wrap_bounded_window_agg_pandas_udf_np(f, return_type):
--- End diff --

Yes this is not used. I leave it here just to show the difference for now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregati...

2018-10-08 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/22305
  
@BryanCutler Yes that was a typo :) Thanks!

I am also +1 to support numpy data structure in addition to Pandas. So 
happy to discuss here or separately.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregati...

2018-10-08 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/22305
  
Hey folks, any thoughts on this PR?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22620: [SPARK-25601][PYTHON] Register Grouped aggregate ...

2018-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22620#discussion_r222698014
  
--- Diff: python/pyspark/sql/udf.py ---
@@ -310,9 +319,11 @@ def register(self, name, f, returnType=None):
 "Invalid returnType: data type can not be specified 
when f is"
 "a user-defined function, but got %s." % returnType)
 if f.evalType not in [PythonEvalType.SQL_BATCHED_UDF,
-  PythonEvalType.SQL_SCALAR_PANDAS_UDF]:
+  PythonEvalType.SQL_SCALAR_PANDAS_UDF,
+  
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF]:
--- End diff --

I opened https://issues.apache.org/jira/browse/SPARK-25640 to track this.

To be clear, this is transparent to end users, but I agree it can be 
confusing to developers.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22620: [SPARK-25601][PYTHON] Register Grouped aggregate ...

2018-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22620#discussion_r222456993
  
--- Diff: python/pyspark/sql/udf.py ---
@@ -310,9 +319,11 @@ def register(self, name, f, returnType=None):
 "Invalid returnType: data type can not be specified 
when f is"
 "a user-defined function, but got %s." % returnType)
 if f.evalType not in [PythonEvalType.SQL_BATCHED_UDF,
-  PythonEvalType.SQL_SCALAR_PANDAS_UDF]:
+  PythonEvalType.SQL_SCALAR_PANDAS_UDF,
+  
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF]:
--- End diff --

We don't need it here:

Users specify GROUPED_AGG only. GROUPED_AGG is turned to WINDOW_AGG eval 
type in WindowInPandasExec.

Admittedly, there is a bit confusion here we can improve. We just haven't 
got a user specified udf type that maps to multiple evalType before WINDOW_AGG.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22620: [SPARK-25601][PYTHON] Register Grouped aggregate UDF Vec...

2018-10-03 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/22620
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22620: [SPARK-25601][PYTHON] Register Grouped aggregate ...

2018-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22620#discussion_r222421940
  
--- Diff: python/pyspark/sql/udf.py ---
@@ -298,6 +298,15 @@ def register(self, name, f, returnType=None):
 >>> spark.sql("SELECT add_one(id) FROM range(3)").collect()  # 
doctest: +SKIP
 [Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)]
 
+>>> @pandas_udf("integer", PandasUDFType.GROUPED_AGG)  # 
doctest: +SKIP
+... def sum_udf(v):
+... return v.sum()
+...
+>>> _ = spark.udf.register("sum_udf", sum_udf)  # doctest: 
+SKIP
--- End diff --

Ha. I see..


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22620: [SPARK-25601][PYTHON] Register Grouped aggregate ...

2018-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22620#discussion_r222411585
  
--- Diff: python/pyspark/sql/udf.py ---
@@ -298,6 +298,15 @@ def register(self, name, f, returnType=None):
 >>> spark.sql("SELECT add_one(id) FROM range(3)").collect()  # 
doctest: +SKIP
 [Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)]
 
+>>> @pandas_udf("integer", PandasUDFType.GROUPED_AGG)  # 
doctest: +SKIP
+... def sum_udf(v):
+... return v.sum()
+...
+>>> _ = spark.udf.register("sum_udf", sum_udf)  # doctest: 
+SKIP
--- End diff --

what is the "_ =" thing here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregati...

2018-09-27 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/22305
  
Gental ping @cloud-fan @gatorsmile @HyukjinKwon @ueshin 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregati...

2018-09-20 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/22305
  
cc @HyukjinKwon @ueshin @BryanCutler @felixcheung 

This PR is ready for review. I have updated the description so hopefully it 
is easier to review. Please let me know if you need any clarification or 
anything I can help with the review.

Thanks!

Li


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22305: [WIP][SPARK-24561][SQL][Python] User-defined wind...

2018-09-17 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r218244042
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala
 ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.window
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import 
org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.types.{CalendarIntervalType, DateType, 
IntegerType, TimestampType}
+
+private[sql] abstract class WindowExecBase(
+windowExpression: Seq[NamedExpression],
+partitionSpec: Seq[Expression],
+orderSpec: Seq[SortOrder],
+child: SparkPlan) extends UnaryExecNode {
+
+  /**
+   * Create the resulting projection.
+   *
+   * This method uses Code Generation. It can only be used on the executor 
side.
+   *
+   * @param expressions unbound ordered function expressions.
+   * @return the final resulting projection.
+   */
+  protected def createResultProjection(expressions: Seq[Expression]): 
UnsafeProjection = {
+val references = expressions.zipWithIndex.map { case (e, i) =>
+  // Results of window expressions will be on the right side of 
child's output
+  BoundReference(child.output.size + i, e.dataType, e.nullable)
+}
+val unboundToRefMap = expressions.zip(references).toMap
+val patchedWindowExpression = 
windowExpression.map(_.transform(unboundToRefMap))
+UnsafeProjection.create(
+  child.output ++ patchedWindowExpression,
+  child.output)
+  }
+
+  /**
+   * Create a bound ordering object for a given frame type and offset. A 
bound ordering object is
+   * used to determine which input row lies within the frame boundaries of 
an output row.
+   *
+   * This method uses Code Generation. It can only be used on the executor 
side.
+   *
+   * @param frame to evaluate. This can either be a Row or Range frame.
+   * @param bound with respect to the row.
+   * @param timeZone the session local timezone for time related 
calculations.
+   * @return a bound ordering object.
+   */
+  protected def createBoundOrdering(
+  frame: FrameType, bound: Expression, timeZone: String): 
BoundOrdering = {
+(frame, bound) match {
+  case (RowFrame, CurrentRow) =>
+RowBoundOrdering(0)
+
+  case (RowFrame, IntegerLiteral(offset)) =>
+RowBoundOrdering(offset)
+
+  case (RangeFrame, CurrentRow) =>
+val ordering = newOrdering(orderSpec, child.output)
+RangeBoundOrdering(ordering, IdentityProjection, 
IdentityProjection)
+
+  case (RangeFrame, offset: Expression) if orderSpec.size == 1 =>
+// Use only the first order expression when the offset is non-null.
+val sortExpr = orderSpec.head
+val expr = sortExpr.child
+
+// Create the projection which returns the current 'value'.
+val current = newMutableProjection(expr :: Nil, child.output)
+
+// Flip the sign of the offset when processing the order is 
descending
+val boundOffset = sortExpr.direction match {
+  case Descending => UnaryMinus(offset)
+  case Ascending => offset
+}
+
+// Create the projection which returns the current 'value' 
modified by adding the offset.
+val boundExpr = (expr.dataType, boundOffset.dataType) match {
+  case (DateType, IntegerType) => DateAdd(expr, boundOffset)
+  case (TimestampType, CalendarIntervalType) =>
+TimeAdd(expr, boundOffset, Some

[GitHub] spark pull request #22305: [WIP][SPARK-24561][SQL][Python] User-defined wind...

2018-09-17 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r218243887
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala
 ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.window
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import 
org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.types.{CalendarIntervalType, DateType, 
IntegerType, TimestampType}
+
+private[sql] abstract class WindowExecBase(
+windowExpression: Seq[NamedExpression],
+partitionSpec: Seq[Expression],
+orderSpec: Seq[SortOrder],
+child: SparkPlan) extends UnaryExecNode {
+
+  /**
+   * Create the resulting projection.
+   *
+   * This method uses Code Generation. It can only be used on the executor 
side.
+   *
+   * @param expressions unbound ordered function expressions.
+   * @return the final resulting projection.
+   */
+  protected def createResultProjection(expressions: Seq[Expression]): 
UnsafeProjection = {
+val references = expressions.zipWithIndex.map { case (e, i) =>
+  // Results of window expressions will be on the right side of 
child's output
+  BoundReference(child.output.size + i, e.dataType, e.nullable)
+}
+val unboundToRefMap = expressions.zip(references).toMap
+val patchedWindowExpression = 
windowExpression.map(_.transform(unboundToRefMap))
+UnsafeProjection.create(
+  child.output ++ patchedWindowExpression,
+  child.output)
+  }
+
+  /**
+   * Create a bound ordering object for a given frame type and offset. A 
bound ordering object is
+   * used to determine which input row lies within the frame boundaries of 
an output row.
+   *
+   * This method uses Code Generation. It can only be used on the executor 
side.
+   *
+   * @param frame to evaluate. This can either be a Row or Range frame.
+   * @param bound with respect to the row.
+   * @param timeZone the session local timezone for time related 
calculations.
+   * @return a bound ordering object.
+   */
+  protected def createBoundOrdering(
+  frame: FrameType, bound: Expression, timeZone: String): 
BoundOrdering = {
+(frame, bound) match {
+  case (RowFrame, CurrentRow) =>
+RowBoundOrdering(0)
+
+  case (RowFrame, IntegerLiteral(offset)) =>
+RowBoundOrdering(offset)
+
+  case (RangeFrame, CurrentRow) =>
+val ordering = newOrdering(orderSpec, child.output)
+RangeBoundOrdering(ordering, IdentityProjection, 
IdentityProjection)
+
+  case (RangeFrame, offset: Expression) if orderSpec.size == 1 =>
+// Use only the first order expression when the offset is non-null.
+val sortExpr = orderSpec.head
+val expr = sortExpr.child
+
+// Create the projection which returns the current 'value'.
+val current = newMutableProjection(expr :: Nil, child.output)
+
+// Flip the sign of the offset when processing the order is 
descending
+val boundOffset = sortExpr.direction match {
+  case Descending => UnaryMinus(offset)
+  case Ascending => offset
+}
+
+// Create the projection which returns the current 'value' 
modified by adding the offset.
+val boundExpr = (expr.dataType, boundOffset.dataType) match {
+  case (DateType, IntegerType) => DateAdd(expr, boundOffset)
+  case (TimestampType, CalendarIntervalType) =>
+TimeAdd(expr, boundOffset, Some

[GitHub] spark issue #22305: [WIP][SPARK-24561][SQL][Python] User-defined window aggr...

2018-09-14 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/22305
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22329: [SPARK-25328][PYTHON] Add an example for having two colu...

2018-09-06 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/22329
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22329: [SPARK-25328][PYTHON] Add an example for having t...

2018-09-05 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22329#discussion_r215267320
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2804,6 +2804,22 @@ def pandas_udf(f=None, returnType=None, 
functionType=None):
|  1|1.5|
|  2|6.0|
+---+---+
+   >>> @pandas_udf(
+   ..."id long, additional_key double, v double",
--- End diff --

do you mind changing the type of additional_key to long? It seems like the 
type coercion here is not necessary. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22329: [SPARK-25328][PYTHON] Add an example for having t...

2018-09-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22329#discussion_r214940744
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2804,6 +2804,20 @@ def pandas_udf(f=None, returnType=None, 
functionType=None):
|  1|1.5|
|  2|6.0|
+---+---+
+   >>> @pandas_udf("id long, v1 double, v2 double", 
PandasUDFType.GROUPED_MAP)  # doctest: +SKIP
--- End diff --

It took me a while to realize `v1` is a grouping key. It also a bit 
uncommon to use double value as a grouping key . How about we do sth like?

`id long, additional_key long, v double`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22305: [WIP][SPARK-24561][SQL][Python] User-defined window aggr...

2018-08-31 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/22305
  
The current state is a minimum working version - I copied some code from 
`WindowExec` to make this work but will need to refactor those.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22305: [WIP][SPARK-24561][SQL][Python] User-defined wind...

2018-08-31 Thread icexelloss
GitHub user icexelloss opened a pull request:

https://github.com/apache/spark/pull/22305

[WIP][SPARK-24561][SQL][Python] User-defined window aggregation functions 
with Pandas UDF (bounded window)

## What changes were proposed in this pull request?

### **This is currently WIP** 

This PR implements a new feature - window aggregation Pandas UDF for 
bounded window.

Example:
```
@pandas_udf('double', PandasUDFType.GROUPED_AGG)
def avg(v):
return v.mean()
return avg

w =  Window.partitionBy('id').rowsBetween(-2, 3)

result1 = df.withColumn('mean_v', avg(df['v']).over(w))
```

## How was this patch tested?

New tests


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/icexelloss/spark 
SPARK-24561-bounded-window-udf

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22305.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22305


commit 4a4ba9406b7cb393eb4397083f872ee463eca97e
Author: Li Jin 
Date:   2018-08-29T20:39:16Z

wip

commit f9e73265dee746e3192a858bea1b5eca6a1b1826
Author: Li Jin 
Date:   2018-08-30T14:49:16Z

Remove empty line

commit 28414fb9f099bd063dc24cb36878d006ccd2d53d
Author: Li Jin 
Date:   2018-08-31T14:51:09Z

Initial commit (WIP)

commit 51a9dcf0166d68ce7fbdc380a686426eeff5ebb6
Author: Li Jin 
Date:   2018-08-31T15:22:44Z

Fix case for unbounded window




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22208: [SPARK-25216][SQL] Improve error message when a column c...

2018-08-29 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/22208
  
@dongjoon-hyun SGTM. I misunderstood your suggestion about resolver. 
Keeping it simple was my preference too.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22208: [SPARK-25216][SQL] Improve error message when a column c...

2018-08-28 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/22208
  
@dongjoon-hyun Could please take another look? I changed to use resolver 
and try to resolve column with backticks and added unit tests as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in FileSo...

2018-08-28 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/22104
  
Thanks all for the review!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22244: [WIP][SPARK-24721][SPARK-25213][SQL] extract python UDF ...

2018-08-27 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/22244
  
@cloud-fan Thanks! I will take a look later today and incorporate this with 
my patch. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22208: [SPARK-25216][SQL] Improve error message when a c...

2018-08-24 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22208#discussion_r212716787
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -216,8 +216,16 @@ class Dataset[T] private[sql](
   private[sql] def resolve(colName: String): NamedExpression = {
 queryExecution.analyzed.resolveQuoted(colName, 
sparkSession.sessionState.analyzer.resolver)
   .getOrElse {
-throw new AnalysisException(
-  s"""Cannot resolve column name "$colName" among 
(${schema.fieldNames.mkString(", ")})""")
+if (schema.fieldNames.contains(colName)) {
+  throw new AnalysisException(
+s"""Cannot resolve column name "$colName" among 
(${schema.fieldNames.mkString(", ")}).
+   | Try adding backticks to the column name, i.e., 
`$colName`"""
--- End diff --

I see, how about:
```
Try adding backticks to the column name, i.e., `$colName`, if $colName is 
the name of the whole column
```
I am fine with either one



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22208: [SPARK-25216][SQL] Improve error message when a c...

2018-08-24 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22208#discussion_r212629188
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -216,8 +216,16 @@ class Dataset[T] private[sql](
   private[sql] def resolve(colName: String): NamedExpression = {
 queryExecution.analyzed.resolveQuoted(colName, 
sparkSession.sessionState.analyzer.resolver)
   .getOrElse {
-throw new AnalysisException(
-  s"""Cannot resolve column name "$colName" among 
(${schema.fieldNames.mkString(", ")})""")
+if (schema.fieldNames.contains(colName)) {
+  throw new AnalysisException(
+s"""Cannot resolve column name "$colName" among 
(${schema.fieldNames.mkString(", ")}).
+   | Try adding backticks to the column name, i.e., 
`$colName`"""
--- End diff --

@HyukjinKwon Thanks for the review!

Sorry I don't quite understand your sentence here:
>  if the name parts in the column should be kept as the part of its column 
name

Would you mind elaborating what do you mean?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...

2018-08-23 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22104#discussion_r212460124
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3367,6 +3367,35 @@ def test_ignore_column_of_all_nulls(self):
 finally:
 shutil.rmtree(path)
 
+# SPARK-24721
+def test_datasource_with_udf_filter_lit_input(self):
+import pandas as pd
+import numpy as np
+from pyspark.sql.functions import udf, pandas_udf, lit, col
+
+path = tempfile.mkdtemp()
+shutil.rmtree(path)
+try:
+
self.spark.range(1).write.mode("overwrite").format('csv').save(path)
+filesource_df = self.spark.read.csv(path)
--- End diff --

Created separate tests for pandas_udf under ScalarPandasUDFTests


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in FileSo...

2018-08-23 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/22104
  
@HyukjinKwon I addressed the comments. Do you mind taking a another look?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22208: Improve error message when a column containing do...

2018-08-23 Thread icexelloss
GitHub user icexelloss opened a pull request:

https://github.com/apache/spark/pull/22208

Improve error message when a column containing dot cannot be resolved

## What changes were proposed in this pull request?

The current error message is  often confusing to a new Spark user that a 
column containing "." needs backticks quote. 

For example, consider the following code:
```
spark.range(0, 1).toDF('a.b')['a.b']
```

the current message looks like:

```
Cannot resolve column name "a.b" among (a.b)
```
This PR improves the error message to, 

```
Cannot resolve column name "a.b" among (a.b). Try adding backticks to the 
column name, i.e., `a.b`;
``` 

## How was this patch tested?

Manual test in shell


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/icexelloss/spark 
SPARK-25216-backticks-error-message

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22208.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22208


commit 21a3732104f311785946a0808dbc132f0e7a892e
Author: Li Jin 
Date:   2018-08-23T18:42:37Z

Improve error message when a column containing dot cannot be resolved




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...

2018-08-23 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22104#discussion_r212396812
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3367,6 +3367,33 @@ def test_ignore_column_of_all_nulls(self):
 finally:
 shutil.rmtree(path)
 
+# SPARK-24721
+def test_datasource_with_udf_filter_lit_input(self):
+from pyspark.sql.functions import udf, lit, col
+
+path = tempfile.mkdtemp()
+shutil.rmtree(path)
+try:
+
self.spark.range(1).write.mode("overwrite").format('csv').save(path)
+filesource_df = self.spark.read.csv(path)
+datasource_df = self.spark.read \
+.format("org.apache.spark.sql.sources.SimpleScanSource") \
+.option('from', 0).option('to', 1).load()
+datasource_v2_df = self.spark.read \
+
.format("org.apache.spark.sql.sources.v2.SimpleDataSourceV2") \
--- End diff --

Added checks to skip the tests if scala tests are not compiled


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...

2018-08-23 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22104#discussion_r212347966
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
 ---
@@ -117,15 +117,18 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], 
output: Seq[Attribute], chil
   }
 }.toArray
   }.toArray
-  val projection = newMutableProjection(allInputs, child.output)
+
+  // Project input rows to unsafe row so we can put it in the row queue
+  val unsafeProjection = UnsafeProjection.create(child.output, 
child.output)
--- End diff --

Created https://jira.apache.org/jira/browse/SPARK-25213


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...

2018-08-23 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22104#discussion_r212340459
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
 ---
@@ -117,15 +117,18 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], 
output: Seq[Attribute], chil
   }
 }.toArray
   }.toArray
-  val projection = newMutableProjection(allInputs, child.output)
+
+  // Project input rows to unsafe row so we can put it in the row queue
+  val unsafeProjection = UnsafeProjection.create(child.output, 
child.output)
--- End diff --

@cloud-fan Sorry, I don't think I am being very clear...

> If the data source does not produce UnsafeRow, Spark will make sure there 
will be a project
> above it to produce UnsafeRow

I don't think this is happening for datasource V2 right now:

(Code running in pyspark test)
```
datasource_v2_df = self.spark.read \

.format("org.apache.spark.sql.sources.v2.SimpleDataSourceV2") \
.load()
result = datasource_v2_df.withColumn('x', udf(lambda x: x, 
'int')(datasource_v2_df['i']))
result.show()
```
The code above fails with:
```
Caused by: java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to 
org.apache.spark.sql.catalyst.expressions.UnsafeRow
at 
org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1$$anonfun$5.apply(EvalPythonExec.scala:127)
at 
org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1$$anonfun$5.apply(EvalPythonExec.scala:126)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at 
scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1074)
```

I think this is an issue with DataSourceV2 that probably should be 
addressed in another PR (DataSourceV1 works fine). @cloud-fan WDYT?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...

2018-08-23 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22104#discussion_r212309541
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
 ---
@@ -117,15 +117,18 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], 
output: Seq[Attribute], chil
   }
 }.toArray
   }.toArray
-  val projection = newMutableProjection(allInputs, child.output)
+
+  // Project input rows to unsafe row so we can put it in the row queue
+  val unsafeProjection = UnsafeProjection.create(child.output, 
child.output)
--- End diff --

Thanks! I will remove this then.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21546: [SPARK-23030][SQL][PYTHON] Use Arrow stream forma...

2018-08-22 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21546#discussion_r211964996
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
 ---
@@ -183,34 +178,106 @@ private[sql] object ArrowConverters {
   }
 
   /**
-   * Convert a byte array to an ArrowRecordBatch.
+   * Load a serialized ArrowRecordBatch.
*/
-  private[arrow] def byteArrayToBatch(
+  private[arrow] def loadBatch(
   batchBytes: Array[Byte],
   allocator: BufferAllocator): ArrowRecordBatch = {
-val in = new ByteArrayReadableSeekableByteChannel(batchBytes)
-val reader = new ArrowFileReader(in, allocator)
-
-// Read a batch from a byte stream, ensure the reader is closed
-Utils.tryWithSafeFinally {
-  val root = reader.getVectorSchemaRoot  // throws IOException
-  val unloader = new VectorUnloader(root)
-  reader.loadNextBatch()  // throws IOException
-  unloader.getRecordBatch
-} {
-  reader.close()
-}
+val in = new ByteArrayInputStream(batchBytes)
+MessageSerializer.deserializeRecordBatch(
+  new ReadChannel(Channels.newChannel(in)), allocator)  // throws 
IOException
   }
 
+  /**
+   * Create a DataFrame from a JavaRDD of serialized ArrowRecordBatches.
+   */
   private[sql] def toDataFrame(
-  payloadRDD: JavaRDD[Array[Byte]],
+  arrowBatchRDD: JavaRDD[Array[Byte]],
   schemaString: String,
   sqlContext: SQLContext): DataFrame = {
-val rdd = payloadRDD.rdd.mapPartitions { iter =>
+val schema = DataType.fromJson(schemaString).asInstanceOf[StructType]
+val timeZoneId = sqlContext.sessionState.conf.sessionLocalTimeZone
+val rdd = arrowBatchRDD.rdd.mapPartitions { iter =>
   val context = TaskContext.get()
-  ArrowConverters.fromPayloadIterator(iter.map(new ArrowPayload(_)), 
context)
+  ArrowConverters.fromBatchIterator(iter, schema, timeZoneId, context)
 }
-val schema = DataType.fromJson(schemaString).asInstanceOf[StructType]
 sqlContext.internalCreateDataFrame(rdd, schema)
   }
+
+  /**
+   * Read a file as an Arrow stream and parallelize as an RDD of 
serialized ArrowRecordBatches.
+   */
+  private[sql] def readArrowStreamFromFile(
+  sqlContext: SQLContext,
+  filename: String): JavaRDD[Array[Byte]] = {
+val fileStream = new FileInputStream(filename)
+try {
+  // Create array so that we can safely close the file
+  val batches = getBatchesFromStream(fileStream.getChannel).toArray
+  // Parallelize the record batches to create an RDD
+  JavaRDD.fromRDD(sqlContext.sparkContext.parallelize(batches, 
batches.length))
+} finally {
+  fileStream.close()
+}
+  }
+
+  /**
+   * Read an Arrow stream input and return an iterator of serialized 
ArrowRecordBatches.
+   */
+  private[sql] def getBatchesFromStream(in: SeekableByteChannel): 
Iterator[Array[Byte]] = {
+
+// Create an iterator to get each serialized ArrowRecordBatch from a 
stream
+new Iterator[Array[Byte]] {
+  var batch: Array[Byte] = readNextBatch()
+
+  override def hasNext: Boolean = batch != null
+
+  override def next(): Array[Byte] = {
+val prevBatch = batch
+batch = readNextBatch()
+prevBatch
+  }
+
+  def readNextBatch(): Array[Byte] = {
+val msgMetadata = MessageSerializer.readMessage(new 
ReadChannel(in))
+if (msgMetadata == null) {
+  return null
+}
+
+// Get the length of the body, which has not be read at this point
+val bodyLength = msgMetadata.getMessageBodyLength.toInt
+
+// Only care about RecordBatch data, skip Schema and unsupported 
Dictionary messages
+if (msgMetadata.getMessage.headerType() == 
MessageHeader.RecordBatch) {
+
+  // Create output backed by buffer to hold msg length (int32), 
msg metadata, msg body
+  val bbout = new ByteBufferOutputStream(4 + 
msgMetadata.getMessageLength + bodyLength)
--- End diff --

Add a comment that this is the deserialized form of an Arrow Record Batch? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...

2018-08-21 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22104#discussion_r211733007
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
 ---
@@ -117,15 +117,18 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], 
output: Seq[Attribute], chil
   }
 }.toArray
   }.toArray
-  val projection = newMutableProjection(allInputs, child.output)
+
+  // Project input rows to unsafe row so we can put it in the row queue
+  val unsafeProjection = UnsafeProjection.create(child.output, 
child.output)
--- End diff --

Friendly ping @cloud-fan. Do you think forcing a unsafeProject here to deal 
with non-unsafe rows from data sources are correct? Is there a way to know 
whether the children nodes output unsafe rows so to avoid unnecessary unsafe 
projection here? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...

2018-08-17 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22104#discussion_r210996331
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3367,6 +3367,33 @@ def test_ignore_column_of_all_nulls(self):
 finally:
 shutil.rmtree(path)
 
+# SPARK-24721
+def test_datasource_with_udf_filter_lit_input(self):
+from pyspark.sql.functions import udf, lit, col
+
+path = tempfile.mkdtemp()
+shutil.rmtree(path)
+try:
+
self.spark.range(1).write.mode("overwrite").format('csv').save(path)
+filesource_df = self.spark.read.csv(path)
+datasource_df = self.spark.read \
+.format("org.apache.spark.sql.sources.SimpleScanSource") \
+.option('from', 0).option('to', 1).load()
+datasource_v2_df = self.spark.read \
+
.format("org.apache.spark.sql.sources.v2.SimpleDataSourceV2") \
--- End diff --

Hmm... I think this is a bit fragile because things like "scala-2.11"  
(scala version can change).

Seems a bit over complicated to do this properly, when do we expect pyspark 
test to run without compiling scala test classes?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...

2018-08-17 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22104#discussion_r210955687
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3367,6 +3367,33 @@ def test_ignore_column_of_all_nulls(self):
 finally:
 shutil.rmtree(path)
 
+# SPARK-24721
+def test_datasource_with_udf_filter_lit_input(self):
+from pyspark.sql.functions import udf, lit, col
+
+path = tempfile.mkdtemp()
+shutil.rmtree(path)
+try:
+
self.spark.range(1).write.mode("overwrite").format('csv').save(path)
+filesource_df = self.spark.read.csv(path)
+datasource_df = self.spark.read \
+.format("org.apache.spark.sql.sources.SimpleScanSource") \
+.option('from', 0).option('to', 1).load()
+datasource_v2_df = self.spark.read \
+
.format("org.apache.spark.sql.sources.v2.SimpleDataSourceV2") \
--- End diff --

I can probably check try to check the existence of 
sql/core/target/scala-2.11/test-classes


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...

2018-08-17 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22104#discussion_r210954447
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3367,6 +3367,33 @@ def test_ignore_column_of_all_nulls(self):
 finally:
 shutil.rmtree(path)
 
+# SPARK-24721
+def test_datasource_with_udf_filter_lit_input(self):
+from pyspark.sql.functions import udf, lit, col
+
+path = tempfile.mkdtemp()
+shutil.rmtree(path)
+try:
+
self.spark.range(1).write.mode("overwrite").format('csv').save(path)
+filesource_df = self.spark.read.csv(path)
+datasource_df = self.spark.read \
+.format("org.apache.spark.sql.sources.SimpleScanSource") \
+.option('from', 0).option('to', 1).load()
+datasource_v2_df = self.spark.read \
+
.format("org.apache.spark.sql.sources.v2.SimpleDataSourceV2") \
--- End diff --

@HyukjinKwon I actually am not sure how does pyspark find these classes and 
how to check the existence, do you have an example?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in FileSo...

2018-08-16 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/22104
  
Tests pass now. This comment 
https://github.com/apache/spark/pull/22104/files#r210414941 requires some 
attention. @cloud-fan Do you think this is the right way to handle 
GenericInternalRow inputs here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...

2018-08-15 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22104#discussion_r210414941
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
 ---
@@ -117,15 +117,18 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], 
output: Seq[Attribute], chil
   }
 }.toArray
   }.toArray
-  val projection = newMutableProjection(allInputs, child.output)
+
+  // Project input rows to unsafe row so we can put it in the row queue
+  val unsafeProjection = UnsafeProjection.create(child.output, 
child.output)
--- End diff --

This requires some discussion.

This is probably another bug I found in testing this - If the input node to 
EvalPythonExec doesn't produce UnsafeRow, and cast here will fail. I don't know 
if we require data sources to produce unsafe rows, if not, then this is a 
problem.

I also don't know if this will introduce additional copy if input is 
already UnsafeRow - it seems like UnsafeProject should be smart to skip the 
copy but I am not sure if it's actually the case




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...

2018-08-15 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22104#discussion_r210410738
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
 ---
@@ -117,15 +117,16 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], 
output: Seq[Attribute], chil
   }
 }.toArray
   }.toArray
-  val projection = newMutableProjection(allInputs, child.output)
+  val projection = UnsafeProjection.create(allInputs, child.output)
   val schema = StructType(dataTypes.zipWithIndex.map { case (dt, i) =>
 StructField(s"_$i", dt)
   })
 
   // Add rows to queue to join later with the result.
   val projectedRowIter = iter.map { inputRow =>
-queue.add(inputRow.asInstanceOf[UnsafeRow])
-projection(inputRow)
+val unsafeRow = projection(inputRow)
+queue.add(unsafeRow.asInstanceOf[UnsafeRow])
--- End diff --

Ok.. This seems to break existing tests. Need to look into it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...

2018-08-15 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22104#discussion_r210391237
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3367,6 +3367,35 @@ def test_ignore_column_of_all_nulls(self):
 finally:
 shutil.rmtree(path)
 
+# SPARK-24721
+def test_datasource_with_udf_filter_lit_input(self):
+import pandas as pd
+import numpy as np
+from pyspark.sql.functions import udf, pandas_udf, lit, col
+
+path = tempfile.mkdtemp()
+shutil.rmtree(path)
+try:
+
self.spark.range(1).write.mode("overwrite").format('csv').save(path)
+filesource_df = self.spark.read.csv(path)
--- End diff --

@gatorsmile Added tests for file source, data source and data source v2. I 
might need to move the pandas_udf tests into another tests because of 
arrow_requirement :(


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...

2018-08-15 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22104#discussion_r210390770
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
 ---
@@ -133,6 +134,9 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with 
PredicateHelper {
   }
 
   def apply(plan: SparkPlan): SparkPlan = plan transformUp {
+// SPARK-24721: Ignore Python UDFs in DataSourceScan and 
DataSourceV2Scan
+case plan: DataSourceScanExec => plan
--- End diff --

I get rid of the logic previously in `FileSourceStrategy` to exclude 
PythonUDF in the filter in favor of this fix - I think this fix is cleaner. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...

2018-08-15 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22104#discussion_r210390399
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
 ---
@@ -117,15 +117,16 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], 
output: Seq[Attribute], chil
   }
 }.toArray
   }.toArray
-  val projection = newMutableProjection(allInputs, child.output)
+  val projection = UnsafeProjection.create(allInputs, child.output)
   val schema = StructType(dataTypes.zipWithIndex.map { case (dt, i) =>
 StructField(s"_$i", dt)
   })
 
   // Add rows to queue to join later with the result.
   val projectedRowIter = iter.map { inputRow =>
-queue.add(inputRow.asInstanceOf[UnsafeRow])
-projection(inputRow)
+val unsafeRow = projection(inputRow)
+queue.add(unsafeRow.asInstanceOf[UnsafeRow])
--- End diff --

This is probably another bug I found in testing this - If the input node to 
EvalPythonExec doesn't produce UnsafeRow, and cast here will fail. 

I found this in testing when I pass in an test data source scan node, which 
produces GeneralInternalRow, will throw exception here.

I am happy to submit this as a separate patch if people think it's 
necessary 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in FileSo...

2018-08-15 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/22104
  
Thanks @HyukjinKwon and @cloud-fan ! I will take a look


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in FileSo...

2018-08-14 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/22104
  
I think another way to fix this is  to move the logic to `ExtractPythonUDF` 
to ignore `FileScanExec` `DataSourceScanExec` and `DataSourceV2ScanExec` 
instead of changing all three rules. The downside is that if a XScanExec node 
with a Python UDF pushed filter throws exception somewhere else, we need to fix 
that too. Not sure which way is better. But either way, it would be good to 
create test case with data source and data source V2... Would appreciate some 
advise on how to create such relation in test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in FileSo...

2018-08-14 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/22104
  
@gatorsmile Can you advise how to create a df with data source? All my 
attempts end up triggering FileSourceStrategy not DataSourceStrategy


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in FileSo...

2018-08-14 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/22104
  
@gatorsmile Possibly, let me see if I can create a test case 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...

2018-08-14 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22104#discussion_r210052093
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3367,6 +3367,24 @@ def test_ignore_column_of_all_nulls(self):
 finally:
 shutil.rmtree(path)
 
+def test_datasource_with_udf_filter_lit_input(self):
--- End diff --

Make sense. Will add.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in FileSo...

2018-08-14 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/22104
  
retest please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in FileSo...

2018-08-14 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/22104
  
cc @cloud-fan . Followed your suggestion here: 
https://issues.apache.org/jira/browse/SPARK-24721?focusedCommentId=16560537=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16560537


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...

2018-08-14 Thread icexelloss
GitHub user icexelloss opened a pull request:

https://github.com/apache/spark/pull/22104

[SPARK-24721][SQL] Exclude Python UDFs filters in FileSourceStrategy 

## What changes were proposed in this pull request?
The PR excludes Python UDFs filters in FileSourceStrategy so that they 
don't ExtractPythonUDF rule to throw exception. It doesn't make sense to pass 
Python UDF filters in FileSourceStrategy anyway because they cannot be used as 
push down filters.

## How was this patch tested?
Add a new regression test



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/icexelloss/spark SPARK-24721-udf-filter

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22104.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22104


commit 512f4b64cb7662baa23995c6f6c109a735ec8f5e
Author: Li Jin 
Date:   2018-08-14T14:22:50Z

Fix file strategy to exclude python UDF filters




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21928: [SPARK-24976][PYTHON] Allow None for Decimal type conver...

2018-07-31 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/21928
  
I see. Yeah sounds good to me.

On Tue, Jul 31, 2018 at 12:30 PM Hyukjin Kwon 
wrote:

> I think we shouldn't change minimum PyArrow version in 2.4.0 and the
> upgrade doesn't require to change it as far as I remember.
>
> We should backport this anyway even if we will upgrade it for 2.4.0. let's
> go ahead.
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/21928#issuecomment-409283599>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/AAwbrI0nm_g9FHFrQFonGQtzvh7HeQQ6ks5uMIYJgaJpZM4VnjD_>
> .
>



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21887: [SPARK-23633][SQL] Update Pandas UDFs section in sql-pro...

2018-07-31 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/21887
  
Thanks! @HyukjinKwon 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21928: [SPARK-24976][PYTHON] Allow None for Decimal type conver...

2018-07-31 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/21928
  
@HyukjinKwon arrow 0.10.0 release is around the corner. I think Spark 2.4 
will very likely to ship with 0.10.0 (where I believe this issue has been 
fixed, @BryanCutler can you confirm?)

I am not sure if it's necessary to has this patch just for pyarrow 0.9.0 ...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21887: [SPARK-23633][SQL] Update Pandas UDFs section in sql-pro...

2018-07-30 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/21887
  
@HyukjinKwon I manually generated the doc and looks good to me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21887: [SPARK-23633][SQL] Update Pandas UDFs section in ...

2018-07-28 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21887#discussion_r205943208
  
--- Diff: examples/src/main/python/sql/arrow.py ---
@@ -113,6 +113,42 @@ def substract_mean(pdf):
 # $example off:grouped_map_pandas_udf$
 
 
+def grouped_agg_pandas_udf_example(spark):
+# $example on:grouped_agg_pandas_udf$
+from pyspark.sql.functions import pandas_udf, PandasUDFType
+from pyspark.sql import Window
+
+df = spark.createDataFrame(
+[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+("id", "v"))
+
+@pandas_udf("double", PandasUDFType.GROUPED_AGG)
+def mean_udf(v):
+return v.mean()
+df.groupby("id").agg(mean_udf(df['v'])).show()
+# +---+---+
+# | id|mean_udf(v)|
+# +---+---+
+# |  1|1.5|
+# |  2|6.0|
+# +---+---+
+
+w = Window \
+.partitionBy('id') \
+.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
+df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
+# +---++--+
+# | id|   v|mean_v|
+# +---++--+
+# |  1| 1.0|   1.5|
+# |  1| 2.0|   1.5|
+# |  2| 3.0|   6.0|
+# |  2| 5.0|   6.0|
+# |  2|10.0|   6.0|
+# +---++--+
+# $example off:grouped_map_pandas_udf$
--- End diff --

Ah good catch, my bad. Let me try building the doc.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

2018-07-28 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/21650
  
Thanks @HyukjinKwon @BryanCutler for the review!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

2018-07-27 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/21650
  
retest please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

2018-07-27 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21650#discussion_r205872386
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
 ---
@@ -94,36 +95,52 @@ object ExtractPythonUDFFromAggregate extends 
Rule[LogicalPlan] {
  */
 object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
 
-  private def hasPythonUDF(e: Expression): Boolean = {
+  private type EvalType = Int
+  private type EvalTypeChecker = EvalType => Boolean
+
+  private def hasScalarPythonUDF(e: Expression): Boolean = {
 e.find(PythonUDF.isScalarPythonUDF).isDefined
   }
 
   private def canEvaluateInPython(e: PythonUDF): Boolean = {
 e.children match {
   // single PythonUDF child could be chained and evaluated in Python
-  case Seq(u: PythonUDF) => canEvaluateInPython(u)
+  case Seq(u: PythonUDF) => e.evalType == u.evalType && 
canEvaluateInPython(u)
   // Python UDF can't be evaluated directly in JVM
-  case children => !children.exists(hasPythonUDF)
+  case children => !children.exists(hasScalarPythonUDF)
 }
   }
 
-  private def collectEvaluatableUDF(expr: Expression): Seq[PythonUDF] = 
expr match {
-case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && 
canEvaluateInPython(udf) => Seq(udf)
-case e => e.children.flatMap(collectEvaluatableUDF)
+  private def collectEvaluableUDFsFromExpressions(expressions: 
Seq[Expression]): Seq[PythonUDF] = {
+// Eval type checker is set once when we find the first evaluable UDF 
and its value
+// shouldn't change later.
+// Used to check if subsequent UDFs are of the same type as the first 
UDF. (since we can only
+// extract UDFs of the same eval type)
+var evalTypeChecker: Option[EvalTypeChecker] = None
+
+def collectEvaluableUDFs(expr: Expression): Seq[PythonUDF] = expr 
match {
+  case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && 
canEvaluateInPython(udf)
+&& evalTypeChecker.isEmpty =>
+evalTypeChecker = Some((otherEvalType: EvalType) => otherEvalType 
== udf.evalType)
+Seq(udf)
--- End diff --

@HyukjinKwon In your code this line is `collectEvaluableUDFs (udf)`. I 
think we should just return `Seq(udf)` to avoid checking the expression twice.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

2018-07-27 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21650#discussion_r205866645
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
 ---
@@ -94,36 +95,61 @@ object ExtractPythonUDFFromAggregate extends 
Rule[LogicalPlan] {
  */
 object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
 
-  private def hasPythonUDF(e: Expression): Boolean = {
+  private case class EvalTypeHolder(private var evalType: Int = -1) {
--- End diff --

Ok, I will update the code then.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

2018-07-27 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21650#discussion_r205859891
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
 ---
@@ -94,36 +95,61 @@ object ExtractPythonUDFFromAggregate extends 
Rule[LogicalPlan] {
  */
 object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
 
-  private def hasPythonUDF(e: Expression): Boolean = {
+  private case class EvalTypeHolder(private var evalType: Int = -1) {
--- End diff --

I see... You uses a var and nested function definition and var to remove 
the need of a holder object. 

IMHO I usually find nested function definition and function that refers to 
variable outside its definition scope hard to read, but it could be my personal 
preference. 

Another thing I like about the current impl the is `EvalTypeHolder` class 
ensures its value is ever changed once it's set so I think that's more robust.

That being said, I am ok with your suggestions too if you insist or 
@BryanCutler also prefers it.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21887: [SPARK-23633][SQL] Update Pandas UDFs section in sql-pro...

2018-07-27 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/21887
  
Thanks @HyukjinKwon ! I addressed the comments.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

2018-07-27 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/21650
  
@BryanCutler @HyukjinKwon I updated the PR based on Bryan's suggestion. 
Please take a look and let me know if you have further comments.

Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

2018-07-26 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/21650
  
@HyukjinKwon  I think Bryan's imple looks promising. Please let me take a 
look.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21887: [SPARK-23633][SQL] Update Pandas UDFs section in ...

2018-07-26 Thread icexelloss
GitHub user icexelloss opened a pull request:

https://github.com/apache/spark/pull/21887

[SPARK-23633][SQL] Update Pandas UDFs section in sql-programming-guide

## What changes were proposed in this pull request?

Update Pandas UDFs section in sql-programming-guide. Add section for 
grouped aggregate pandas UDF.

## How was this patch tested?



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/icexelloss/spark 
SPARK-23633-sql-programming-guide

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21887.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21887


commit 2d30b2d00458093a9a4a5941d4f66be2da9c6b97
Author: Li Jin 
Date:   2018-07-26T19:19:54Z

Update Pandas UDF section for grouped aggregate




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

2018-07-26 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21650#discussion_r205448677
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
 ---
@@ -94,36 +95,94 @@ object ExtractPythonUDFFromAggregate extends 
Rule[LogicalPlan] {
  */
 object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
 
-  private def hasPythonUDF(e: Expression): Boolean = {
+  private case class LazyEvalType(var evalType: Int = -1) {
+
+def isSet: Boolean = evalType >= 0
+
+def set(evalType: Int): Unit = {
+  if (isSet) {
+throw new IllegalStateException("Eval type has already been set")
+  } else {
+this.evalType = evalType
+  }
+}
+
+def get(): Int = {
+  if (!isSet) {
+throw new IllegalStateException("Eval type is not set")
+  } else {
+evalType
+  }
+}
+  }
+
+  private def hasScalarPythonUDF(e: Expression): Boolean = {
 e.find(PythonUDF.isScalarPythonUDF).isDefined
   }
 
-  private def canEvaluateInPython(e: PythonUDF): Boolean = {
-e.children match {
-  // single PythonUDF child could be chained and evaluated in Python
-  case Seq(u: PythonUDF) => canEvaluateInPython(u)
-  // Python UDF can't be evaluated directly in JVM
-  case children => !children.exists(hasPythonUDF)
+  /**
+   * Check whether a PythonUDF expression can be evaluated in Python.
+   *
+   * If the lazy eval type is not set, this method checks for either 
Batched Python UDF and Scalar
+   * Pandas UDF. If the lazy eval type is set, this method checks for the 
expression of the
+   * specified eval type.
+   *
+   * This method will also set the lazy eval type to be the type of the 
first evaluable expression,
+   * i.e., if lazy eval type is not set and we find a evaluable Python UDF 
expression, lazy eval
+   * type will be set to the eval type of the expression.
+   *
+   */
+  private def canEvaluateInPython(e: PythonUDF, lazyEvalType: 
LazyEvalType): Boolean = {
--- End diff --

I applied you new code but the test I mentioned above still fails.

I think the issue could be when visiting `f2(f1(col('v')))`, firstEvalType 
is set to Scalar Pandas first and isn't set to Batched SQL later so f1 is not 
extracted. It's possible that my code is still different than yours somehow.

But similar to 
https://github.com/apache/spark/pull/21650#issuecomment-407951457, I think the 
state machine of the eval type holder object here is fairly complicated (i.e., 
what is the expected state of the eval type holder and what's the invariance of 
the algo) with your suggested implementation and I found myself think pretty 
hard to prove the state machine is correct in all cases. If we want to go with 
this implementation, we need to carefully think about it and explain it in 
code...

The lazyEvalType implementation is better IMHO because the state machine is 
simpler - lazyEvalType is empty until we find the first evaluable UDF and the 
value doesn't change after we find the first UDF.

The first implementation (two pass, immutable state) is probably the 
simplest in terms of the mental complexity of the algo but is less efficient. 

I think I am ok with both immutable state or the lazy state. I think 
@HyukjinKwon prefers the immutable state one. @BryanCutler WDYT?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

2018-07-26 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21650#discussion_r205445392
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -5060,6 +5049,147 @@ def test_type_annotation(self):
 df = self.spark.range(1).select(pandas_udf(f=_locals['noop'], 
returnType='bigint')('id'))
 self.assertEqual(df.first()[0], 0)
 
+def test_mixed_udf(self):
+import pandas as pd
+from pyspark.sql.functions import col, udf, pandas_udf
+
+df = self.spark.range(0, 1).toDF('v')
+
+# Test mixture of multiple UDFs and Pandas UDFs
+
+@udf('int')
+def f1(x):
+assert type(x) == int
+return x + 1
+
+@pandas_udf('int')
+def f2(x):
+assert type(x) == pd.Series
+return x + 10
+
+@udf('int')
+def f3(x):
+assert type(x) == int
+return x + 100
+
+@pandas_udf('int')
+def f4(x):
+assert type(x) == pd.Series
+return x + 1000
+
+# Test mixed udfs in a single projection
+df1 = df \
+.withColumn('f1', f1(col('v'))) \
+.withColumn('f2', f2(col('v'))) \
+.withColumn('f3', f3(col('v'))) \
+.withColumn('f4', f4(col('v'))) \
+.withColumn('f2_f1', f2(col('f1'))) \
+.withColumn('f3_f1', f3(col('f1'))) \
+.withColumn('f4_f1', f4(col('f1'))) \
+.withColumn('f3_f2', f3(col('f2'))) \
+.withColumn('f4_f2', f4(col('f2'))) \
+.withColumn('f4_f3', f4(col('f3'))) \
+.withColumn('f3_f2_f1', f3(col('f2_f1'))) \
+.withColumn('f4_f2_f1', f4(col('f2_f1'))) \
+.withColumn('f4_f3_f1', f4(col('f3_f1'))) \
+.withColumn('f4_f3_f2', f4(col('f3_f2'))) \
+.withColumn('f4_f3_f2_f1', f4(col('f3_f2_f1')))
+
+# Test mixed udfs in a single expression
+df2 = df \
+.withColumn('f1', f1(col('v'))) \
+.withColumn('f2', f2(col('v'))) \
+.withColumn('f3', f3(col('v'))) \
+.withColumn('f4', f4(col('v'))) \
+.withColumn('f2_f1', f2(f1(col('v' \
+.withColumn('f3_f1', f3(f1(col('v' \
+.withColumn('f4_f1', f4(f1(col('v' \
+.withColumn('f3_f2', f3(f2(col('v' \
+.withColumn('f4_f2', f4(f2(col('v' \
+.withColumn('f4_f3', f4(f3(col('v' \
+.withColumn('f3_f2_f1', f3(f2(f1(col('v') \
+.withColumn('f4_f2_f1', f4(f2(f1(col('v') \
+.withColumn('f4_f3_f1', f4(f3(f1(col('v') \
+.withColumn('f4_f3_f2', f4(f3(f2(col('v') \
+.withColumn('f4_f3_f2_f1', f4(f3(f2(f1(col('v'))
+
+# expected result
+df3 = df \
+.withColumn('f1', df['v'] + 1) \
+.withColumn('f2', df['v'] + 10) \
+.withColumn('f3', df['v'] + 100) \
+.withColumn('f4', df['v'] + 1000) \
+.withColumn('f2_f1', df['v'] + 11) \
+.withColumn('f3_f1', df['v'] + 101) \
+.withColumn('f4_f1', df['v'] + 1001) \
+.withColumn('f3_f2', df['v'] + 110) \
+.withColumn('f4_f2', df['v'] + 1010) \
+.withColumn('f4_f3', df['v'] + 1100) \
+.withColumn('f3_f2_f1', df['v'] + 111) \
+.withColumn('f4_f2_f1', df['v'] + 1011) \
+.withColumn('f4_f3_f1', df['v'] + 1101) \
+.withColumn('f4_f3_f2', df['v'] + 1110) \
+.withColumn('f4_f3_f2_f1', df['v'] + )
+
+self.assertEquals(df3.collect(), df1.collect())
+self.assertEquals(df3.collect(), df2.collect())
+
+def test_mixed_udf_and_sql(self):
+import pandas as pd
+from pyspark.sql.functions import udf, pandas_udf
+
+df = self.spark.range(0, 1).toDF('v')
+
+# Test mixture of UDFs, Pandas UDFs and SQL expression.
+
+@udf('int')
+def f1(x):
+assert type(x) == int
+return x + 1
+
+def f2(x):
+return x + 10
+
+@pandas_udf('int')
+def f3(x):
+assert type(x) == pd.Series
+return x + 100
+
+df1 = df.withColumn('f1', f1(df['v'])) \
+.withColumn('f2', f2(df['v'])) \
+.withColumn('f3', f3(df['v'])) \
+.withColumn('f1_f2', f1(f2(df['v']))) \
+.withColumn('f1_f3', f1(f3(df['v

[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

2018-07-25 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21650#discussion_r205268767
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
 ---
@@ -94,36 +95,94 @@ object ExtractPythonUDFFromAggregate extends 
Rule[LogicalPlan] {
  */
 object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
 
-  private def hasPythonUDF(e: Expression): Boolean = {
+  private case class LazyEvalType(var evalType: Int = -1) {
+
+def isSet: Boolean = evalType >= 0
+
+def set(evalType: Int): Unit = {
+  if (isSet) {
+throw new IllegalStateException("Eval type has already been set")
+  } else {
+this.evalType = evalType
+  }
+}
+
+def get(): Int = {
+  if (!isSet) {
+throw new IllegalStateException("Eval type is not set")
+  } else {
+evalType
+  }
+}
+  }
+
+  private def hasScalarPythonUDF(e: Expression): Boolean = {
 e.find(PythonUDF.isScalarPythonUDF).isDefined
   }
 
-  private def canEvaluateInPython(e: PythonUDF): Boolean = {
-e.children match {
-  // single PythonUDF child could be chained and evaluated in Python
-  case Seq(u: PythonUDF) => canEvaluateInPython(u)
-  // Python UDF can't be evaluated directly in JVM
-  case children => !children.exists(hasPythonUDF)
+  /**
+   * Check whether a PythonUDF expression can be evaluated in Python.
+   *
+   * If the lazy eval type is not set, this method checks for either 
Batched Python UDF and Scalar
+   * Pandas UDF. If the lazy eval type is set, this method checks for the 
expression of the
+   * specified eval type.
+   *
+   * This method will also set the lazy eval type to be the type of the 
first evaluable expression,
+   * i.e., if lazy eval type is not set and we find a evaluable Python UDF 
expression, lazy eval
+   * type will be set to the eval type of the expression.
+   *
+   */
+  private def canEvaluateInPython(e: PythonUDF, lazyEvalType: 
LazyEvalType): Boolean = {
--- End diff --

Yes it's in the most recent commit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

2018-07-25 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21650#discussion_r205262719
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
 ---
@@ -94,36 +95,94 @@ object ExtractPythonUDFFromAggregate extends 
Rule[LogicalPlan] {
  */
 object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
 
-  private def hasPythonUDF(e: Expression): Boolean = {
+  private case class LazyEvalType(var evalType: Int = -1) {
+
+def isSet: Boolean = evalType >= 0
+
+def set(evalType: Int): Unit = {
+  if (isSet) {
+throw new IllegalStateException("Eval type has already been set")
+  } else {
+this.evalType = evalType
+  }
+}
+
+def get(): Int = {
+  if (!isSet) {
+throw new IllegalStateException("Eval type is not set")
+  } else {
+evalType
+  }
+}
+  }
+
+  private def hasScalarPythonUDF(e: Expression): Boolean = {
 e.find(PythonUDF.isScalarPythonUDF).isDefined
   }
 
-  private def canEvaluateInPython(e: PythonUDF): Boolean = {
-e.children match {
-  // single PythonUDF child could be chained and evaluated in Python
-  case Seq(u: PythonUDF) => canEvaluateInPython(u)
-  // Python UDF can't be evaluated directly in JVM
-  case children => !children.exists(hasPythonUDF)
+  /**
+   * Check whether a PythonUDF expression can be evaluated in Python.
+   *
+   * If the lazy eval type is not set, this method checks for either 
Batched Python UDF and Scalar
+   * Pandas UDF. If the lazy eval type is set, this method checks for the 
expression of the
+   * specified eval type.
+   *
+   * This method will also set the lazy eval type to be the type of the 
first evaluable expression,
+   * i.e., if lazy eval type is not set and we find a evaluable Python UDF 
expression, lazy eval
+   * type will be set to the eval type of the expression.
+   *
+   */
+  private def canEvaluateInPython(e: PythonUDF, lazyEvalType: 
LazyEvalType): Boolean = {
--- End diff --

Bryan, I tried to apply your implementation and the simple test also fail:

```
@udf('int')
def f1(x):
assert type(x) == int
return x + 1

@pandas_udf('int')
def f2(x):
assert type(x) == pd.Series
return x + 10

df_chained_1 = df.withColumn('f2_f1', f2(f1(df['v'])))
expected_chained_1 = df.withColumn('f2_f1', df['v'] + 11)
self.assertEquals(expected_chained_1.collect(), df_chained_1.collect())
```

Do you mind trying this too? Hopefully I didn't do something silly here..


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   6   7   8   >