[GitHub] spark pull request: [SPARK-1442][SQL][WIP] Window Function Support...

2015-04-29 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/5604#issuecomment-97518465
  
Hi,

I have been experimenting with Window functions in Spark SQL as well. It 
has been partially based on this. You can find my work 
[here](https://github.com/hvanhovell/spark-window).

I have deviated from the original implementation in couple of ways:
- Implemented it as an extension to Spark SQL and not Hive. All aggregates 
use the Spark SQL implementations (not the Hive UDAFs).
- Use of SPARK 1.4 child ordering requirements. Sorting is planned by the 
engine; this will especially interesting as soon as exchange will start 
supporting secondary sorting. I have tried a few sorting schemes but this one 
is currently the fastest.
- Only a single window specification (grouping and ordering) is processed 
at a time. The analyzer should take care of multiple window specifications.
- The current implementation is semi-blocking; it processes one group at a 
time. This means only the rows for one group per partition are kept in memory. 
In the future we should also accommodate the case in which all aggregates are 
streaming (perhaps with some buffering).

Shall we try to join forces, and come up with one good PR?

Kind regards,
Herman


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7322] [SQL] [WIP] Support Window Functi...

2015-05-12 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/6104#issuecomment-101519756
  
Hi,

In the JIRA the following examples is given:
```
df.select(
  df.store,
  df.date,
  df.sales,
  avg(df.sales).over.partitionBy(df.store)
.orderBy(df.store) 
.rowsFollowing(0)// this means from unbounded 
preceding to current row
)
```
Is there a reason for why the aggregation operation has moved from the 
beginning (the style in the JIRA), to the end (style above)? Are both still 
possible? I'd prefer the former, since it seems a bit shorter, and more 
recognizable comming from SQL.

On a related note. Is it also an idea to be able to create a seperate 
window (groupBy/orderBy) definition and use this definition in one or more 
windowed aggregates. For example:
```
val window = partitionBy($store).orderBy($date)
df.select (
  $store
  ,$date
  ,sum($sales).over(window).rowsFollowing(0).as(TotalSales)
  
,sum($sales).over(window).rowsFollowing(0).rowsPreceding(2).as(SalesLast3M)
)
```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7712] [SQL] Move Window Functions from ...

2015-05-20 Thread hvanhovell
GitHub user hvanhovell opened a pull request:

https://github.com/apache/spark/pull/6278

[SPARK-7712] [SQL] Move Window Functions from Hive UDAFS to Spark Native 
backend [WIP]

This PR aims to improve the current window functionality in Spark SQL in 
the following ways:
- Moving away from Hive backed UDAF's to a more Spark SQL Native 
implementation. The main advantages are that some overhead can be avoided, that 
it it easier to extend, and that it opens up the opportunity to do more 
aggressive code generation (AggregateEvaluation style).
- Improve the processing for the 'between unbounded and current' amd the 
'between current and unbounded' cases. This was very expensive, now it performs 
in linear time.
- Process different frames for the same group by/order by definition in the 
same window clause. This should save some memory and reduce some of the 
overhead.

This is currently a work in progress. Quite a bit of testing needs to be 
done. Any feedback will be greatly appreciated. The JIRA ticket can be found 
here: https://issues.apache.org/jira/browse/SPARK-7712



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hvanhovell/spark SPARK-7712

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/6278.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6278


commit e4b81d5276076ce1f8130b56c557ff9a4e19dce8
Author: Herman van Hovell hvanhov...@questtec.nl
Date:   2015-05-20T07:30:38Z

Moved Code from separate project into Spark code base. Added HiveQL parsing.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7712] [SQL] Move Window Functions from ...

2015-06-10 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/6278#issuecomment-110769725
  
Thanks for starting the test process.

I'll take a look at the merge issues today.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-06-27 Thread hvanhovell
GitHub user hvanhovell opened a pull request:

https://github.com/apache/spark/pull/7057

[SPARK-8638] [SQL] Window Function Performance Improvements

## Description
Performance improvements for Spark Window functions. This PR will also 
serve as the basis for moving away from Hive UDAFs to Spark UDAFs. See JIRA 
tickets SPARK-8638 and SPARK-7712 for more information.

The original work including some benchmarking code for the running case can 
be here: https://github.com/hvanhovell/spark-window

## Improvements
* Much better performance (10x) in running cases (e.g. BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW) and UNBOUDED FOLLOWING cases. The current 
implementation in spark uses a sliding window approach in these cases. This 
means that an aggregate is maintained for every row, so space usage is N (N 
being the number of rows). This also means that all these aggregates all need 
to be updated separately, this takes N*(N-1)/2 updates. The running case 
differs from the Sliding case because we are only adding data to an aggregate 
function (no reset is required), we only need to maintain one aggregate (like 
in the UNBOUNDED PRECEDING AND UNBOUNDED case), update the aggregate for each 
row, and get the aggregate value after each update. This is what the new 
implementation does. This approach only uses 1 buffer, and only requires N 
updates; I am currently working on data with window sizes of 500-1000 doing 
running sums and this saves a lot of time. The CURRENT ROW AND UNBOUNDED 
FOLLOWING case als
 o uses this approach and the fact that aggregate operations are communitative, 
there is one twist though it will process the input buffer in reverse.
* Fewer comparisons in the sliding case. The current implementation 
determines frame boundaries for every input row. The new implementation makes 
more use of the fact that the window is sorted, maintains the boundaries, and 
only moves them when the current row order changes. This is a minor improvement.
* A single Window node is able to process all types of Frames for the same 
Partitioning/Ordering. This saves a little time/memory spent buffering and 
managing partitions. This will be enabled in a follow-up PR.
* A lot of the staging code is moved from the execution phase to the 
initialization phase. Minor performance improvement, and improves readability 
of the execution code.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hvanhovell/spark SPARK-8638

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/7057.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #7057


commit f161920218c880e4f2804b19db129936d0d34d61
Author: Herman van Hovell hvanhov...@questtec.nl
Date:   2015-06-27T15:39:53Z

Major overhaul of Window operator.

commit 22e51d39c833d0e69a788ad41b8bd790688b9bd9
Author: Herman van Hovell hvanhov...@questtec.nl
Date:   2015-06-27T15:54:00Z

Fixed aggregation and range betweens with unbounded test - two different 
window frames were compared.

commit ad7820c6ac04f188e8a6239d314b680c8a6b4551
Author: Herman van Hovell hvanhov...@questtec.nl
Date:   2015-06-27T16:00:23Z

Disabled Tests 42  43 because tiny numerical differences in answers.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7712] [SQL] Move Window Functions from ...

2015-06-12 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/6278#issuecomment-111584747
  
A small status update:
- The PR currently passes all tests.
- The code has been rebased, but this is a moving target due to the 
frequent commits to the SQL code in general, and the commits to the 
FunctionRegistry in specific.

There are a few things left:
- The use of Expression trees instead of Expression classes for the more 
advanced window functions, needs to be discussed.
- Some of the tests can be moved from Hive to Core.

Other things can be handled in follow-up PR's.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-06-30 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r33641164
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -37,443 +59,622 @@ case class Window(
 child: SparkPlan)
   extends UnaryNode {
 
-  override def output: Seq[Attribute] =
-(projectList ++ windowExpression).map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
 if (windowSpec.partitionSpec.isEmpty) {
-  // This operator will be very expensive.
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning(No Partition Defined for Window operation! Moving all 
data to a single 
++ partition, this can cause serious performance degradation.)
   AllTuples :: Nil
-} else {
-  ClusteredDistribution(windowSpec.partitionSpec) :: Nil
-}
-
-  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
-  // is preserved.
-  override def outputPartitioning: Partitioning = child.outputPartitioning
--- End diff --

Not really. I made a ton of changes (it is more of a rewrite than a small 
patch). I can put the documentation back in, the functionality, can be found in 
lines 70-76.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-06-30 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r33641261
  
--- Diff: 
sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionQuerySuite.scala
 ---
@@ -749,7 +749,7 @@ abstract class HiveWindowFunctionQueryBaseSuite extends 
HiveComparisonTest with
   |range between current row and unbounded following) as s1
   |from part
 .stripMargin, reset = false)
-
+  */
--- End diff --

I think this is due to the reverse order processing of these frames. The 
differences are truely tiny: a few ULPs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-06-30 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r33641591
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala
 ---
@@ -189,7 +189,7 @@ class HiveDataFrameWindowSuite extends QueryTest {
   df.select(
 $key,
 last(value).over(
-  Window.partitionBy($value).orderBy($key).rangeBetween(1, 
Long.MaxValue))
+  Window.partitionBy($value).orderBy($key).rangeBetween(-1, 0))
--- End diff --

Yes, it was wrong. The default behavior when Hive parses the this partial 
frame definition '..RANGE 1 preceding...' is to expand this into '..RANGE 
BETWEEN 1 PRECEDING AND CURRENT ROW...', and not into '..RANGE BETWEEN 1 
FOLLOWING AND UNBOUNDED PRECEDING...'. So the test was wrong.

But this still strikes me as odd, how did this pass previous tests? Did the 
default behavior change?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-06-30 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r33641018
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -19,17 +19,39 @@ package org.apache.spark.sql.execution
 
 import java.util
 
-import org.apache.spark.rdd.RDD
+import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.types.IntegerType
+import org.apache.spark.rdd.RDD
 import org.apache.spark.util.collection.CompactBuffer
+import scala.collection.mutable
 
 /**
  * :: DeveloperApi ::
- * For every row, evaluates `windowExpression` containing Window Functions 
and attaches
- * the results with other regular expressions (presented by `projectList`).
- * Evert operator handles a single Window Specification, `windowSpec`.
+ * This class calculates and outputs (windowed) aggregates over the rows 
in a single sorted group.
+ * The aggregates are calculated for each row in the group. An aggregate 
can take a few forms:
+ * - Global: The aggregate is calculated for the entire group. Every row 
has the same value.
+ * - Rows: The aggregate is calculated based on a subset of the window, 
and is unique for each
+ * row and depends on the position of the given row within the window. The 
group must be sorted
+ * for this to produce sensible output. Examples are moving averages, 
running sums and row
+ * numbers.
+ * - Range: The aggregate is calculated based on a subset of the window, 
and is unique for each
+ * value of the order by clause and depends on its ordering. The group 
must be sorted for this to
+ * produce sensible output.
+ * - Shifted: The aggregate is a displaced value relative to the position 
of the given row.
+ * Examples are Lead and Lag.
--- End diff --

I agree. There are still a few other documentation inconsistencies, and 
I'll try to fix those as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-06-30 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r33642234
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -37,443 +59,622 @@ case class Window(
 child: SparkPlan)
   extends UnaryNode {
 
-  override def output: Seq[Attribute] =
-(projectList ++ windowExpression).map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
 if (windowSpec.partitionSpec.isEmpty) {
-  // This operator will be very expensive.
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning(No Partition Defined for Window operation! Moving all 
data to a single 
++ partition, this can cause serious performance degradation.)
   AllTuples :: Nil
-} else {
-  ClusteredDistribution(windowSpec.partitionSpec) :: Nil
-}
-
-  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
-  // is preserved.
-  override def outputPartitioning: Partitioning = child.outputPartitioning
--- End diff --

It is on line 77 in the new version.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-06-30 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r33642464
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -19,17 +19,39 @@ package org.apache.spark.sql.execution
 
 import java.util
 
-import org.apache.spark.rdd.RDD
+import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.types.IntegerType
+import org.apache.spark.rdd.RDD
 import org.apache.spark.util.collection.CompactBuffer
+import scala.collection.mutable
 
 /**
  * :: DeveloperApi ::
- * For every row, evaluates `windowExpression` containing Window Functions 
and attaches
- * the results with other regular expressions (presented by `projectList`).
- * Evert operator handles a single Window Specification, `windowSpec`.
+ * This class calculates and outputs (windowed) aggregates over the rows 
in a single sorted group.
+ * The aggregates are calculated for each row in the group. An aggregate 
can take a few forms:
+ * - Global: The aggregate is calculated for the entire group. Every row 
has the same value.
+ * - Rows: The aggregate is calculated based on a subset of the window, 
and is unique for each
+ * row and depends on the position of the given row within the window. The 
group must be sorted
+ * for this to produce sensible output. Examples are moving averages, 
running sums and row
+ * numbers.
+ * - Range: The aggregate is calculated based on a subset of the window, 
and is unique for each
+ * value of the order by clause and depends on its ordering. The group 
must be sorted for this to
+ * produce sensible output.
+ * - Shifted: The aggregate is a displaced value relative to the position 
of the given row.
+ * Examples are Lead and Lag.
--- End diff --

The PR also optimizes the processing of Moving and Shrinking frames:
* For moving frame processing the number of comparisons are reduced. This 
didn't look like the most rewarding improvement, but I was surprised to find it 
did improved performance by quite a margin.
* Shrinking frames are indeed processed in reverse order. Which makes 
building it as fast as the growing case (it uses more memory though). I share 
your concerns, and solving this at the root (the function itself) would indeed 
be the best. I'll revert this for now, and file a JIRA request for future 
reference.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9740] [SPARK-9592] [SQL] Change the def...

2015-08-12 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/8113#discussion_r36873052
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala
 ---
@@ -130,24 +147,67 @@ case class First(child: Expression) extends 
AlgebraicAggregate {
 
   private val first = AttributeReference(first, child.dataType)()
 
-  override val bufferAttributes = first :: Nil
+  private val valueSet = AttributeReference(valueSet, BooleanType)()
+
+  override val bufferAttributes = first :: valueSet :: Nil
 
   override val initialValues = Seq(
-/* first = */ Literal.create(null, child.dataType)
+/* first = */ Literal.create(null, child.dataType),
+/* valueSet = */ Literal.create(false, BooleanType)
   )
 
-  override val updateExpressions = Seq(
-/* first = */ If(IsNull(first), child, first)
-  )
+  override val updateExpressions = {
+val litTrue = Literal.create(true, BooleanType)
+if (ignoreNulls) {
+  Seq(
+/* first = */ If(Or(valueSet, IsNull(child)), first, child),
+/* valueSet = */ If(Or(valueSet, IsNull(child)), valueSet, litTrue)
--- End diff --

Nit: ```Or(valueSet, Not(IsNull(child)))``` is a bit shorter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9740] [SPARK-9592] [SQL] Change the def...

2015-08-12 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/8113#discussion_r36873218
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala
 ---
@@ -130,24 +147,67 @@ case class First(child: Expression) extends 
AlgebraicAggregate {
 
   private val first = AttributeReference(first, child.dataType)()
 
-  override val bufferAttributes = first :: Nil
+  private val valueSet = AttributeReference(valueSet, BooleanType)()
+
+  override val bufferAttributes = first :: valueSet :: Nil
 
   override val initialValues = Seq(
-/* first = */ Literal.create(null, child.dataType)
+/* first = */ Literal.create(null, child.dataType),
+/* valueSet = */ Literal.create(false, BooleanType)
   )
 
-  override val updateExpressions = Seq(
-/* first = */ If(IsNull(first), child, first)
-  )
+  override val updateExpressions = {
+val litTrue = Literal.create(true, BooleanType)
+if (ignoreNulls) {
+  Seq(
+/* first = */ If(Or(valueSet, IsNull(child)), first, child),
+/* valueSet = */ If(Or(valueSet, IsNull(child)), valueSet, litTrue)
+  )
+} else {
+  Seq(
+/* first = */ If(valueSet, first, child),
+/* valueSet = */ litTrue
+  )
+}
+  }
 
-  override val mergeExpressions = Seq(
-/* first = */ If(IsNull(first.left), first.right, first.left)
-  )
+  override val mergeExpressions = {
+val litTrue = Literal.create(true, BooleanType)
+if (ignoreNulls) {
+  Seq(
+/* first = */ If(Or(valueSet.left, IsNull(first.right)), 
first.left, first.right),
+/* valueSet = */ If(Or(valueSet.left, IsNull(first.right)), 
valueSet.left, litTrue)
+  )
+} else {
+  Seq(
+/* first = */ If(valueSet.left, first.left, first.right),
+/* valueSet = */ litTrue
--- End diff --

Is it possible that two non-updated aggregates get merged? If it is, then 
this should be: ```Or(valueSet.left. valueSet.right)```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9740] [SQL] Change the default behavior...

2015-08-11 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/8113#discussion_r36818586
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala
 ---
@@ -130,23 +138,36 @@ case class First(child: Expression) extends 
AlgebraicAggregate {
 
   private val first = AttributeReference(first, child.dataType)()
 
-  override val bufferAttributes = first :: Nil
+  private val valueSet = AttributeReference(valueSet, BooleanType)()
+
+  override val bufferAttributes = first :: valueSet :: Nil
 
   override val initialValues = Seq(
-/* first = */ Literal.create(null, child.dataType)
+/* first = */ Literal.create(null, child.dataType),
+/* valueSet = */ Literal.create(false, BooleanType)
   )
 
   override val updateExpressions = Seq(
-/* first = */ If(IsNull(first), child, first)
+/* first = */ If(valueSet, first, child),
+/* valueSet = */ If(valueSet, valueSet, Literal.create(true, 
BooleanType))
   )
 
   override val mergeExpressions = Seq(
-/* first = */ If(IsNull(first.left), first.right, first.left)
+/* first = */ If(valueSet, first.left, first.right),
+/* valueSet = */ If(valueSet, valueSet, Literal.create(true, 
BooleanType))
--- End diff --

We could for now simplify this to ```Literal.create(true, BooleanType)```.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9740] [SPARK-9592] [SQL] Change the def...

2015-08-12 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/8113#issuecomment-130343660
  
One more small thing. We should probably also add the ```ignoreNulls``` 
option to the ```first``` and ```last``` dataframe functions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10100] [SQL] Perfomance improvements to...

2015-08-18 Thread hvanhovell
GitHub user hvanhovell opened a pull request:

https://github.com/apache/spark/pull/8298

[SPARK-10100] [SQL] Perfomance improvements to new MIN/MAX aggregate 
functions.

The new MIN/MAX suffer from a performance regression. This PR aims to fix 
this by simplifying the evaluation of the MIN/MAX functions.

See the JIRA [ticket](https://issues.apache.org/jira/browse/SPARK-10100) 
for more information.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hvanhovell/spark SPARK-10100

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/8298.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #8298


commit 2fed4dcdddbb22d676a78795a3778c6f02a229ab
Author: Herman van Hovell hvanhov...@questtec.nl
Date:   2015-08-19T02:36:32Z

Performance tweaks to the Min/Max functions: removed a branch in their 
evaluation.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9741][SQL] Approximate Count Distinct u...

2015-08-21 Thread hvanhovell
GitHub user hvanhovell opened a pull request:

https://github.com/apache/spark/pull/8362

[SPARK-9741][SQL] Approximate Count Distinct using the new UDAF interface.

This PR implements a HyperLogLog based Approximate Count Distinct function 
using the new UDAF interface.

The implementation is inspired by the ClearSpring HyperLogLog 
implementation and should produce the same results.

There is still some documentation and testing left to do.

cc @yhuai

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hvanhovell/spark SPARK-9741

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/8362.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #8362


commit f52de0236fbfb85053bd6538b12025ba23d34ee1
Author: Herman van Hovell hvanhov...@questtec.nl
Date:   2015-08-21T02:32:15Z

Created HyperLogLog aggregate.

commit 8ec27b93da4f2d39da7c72f7a88c2742cd38318a
Author: Herman van Hovell hvanhov...@questtec.nl
Date:   2015-08-21T12:57:27Z

Added HLL to conversions. More doc. Improvement.

commit e178d9e942720c879dc947a5fd3e6593a351f04f
Author: Herman van Hovell hvanhov...@questtec.nl
Date:   2015-08-21T17:56:39Z

Bug fixes. Style. Documentation.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9741][SQL] Approximate Count Distinct u...

2015-08-21 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/8362#issuecomment-133566608
  
Thanks.

I was aiming for compatibility with the existing approxCountDistinct, but 
we can also implement HLL++. HLL++ introduces three (orthogonal) refinements: 
64-bit hashing, better low cardinality corrections and a sparse encoding 
scheme. The first two refinements are easy to add. The third will require a bit 
more effort.

Unit testing this is a bit of a challenge. End-to-end (blackbox) testing is 
no problem, as long as we know what the result should be, or if we do random 
testing (results should be within 5% of the actual value). Testing parts of the 
algorithm is a bit of a PITA:
* It is hard to reason about the results (the updated registers) HLL 
produces.
* Register access code and HLL code are intertwined.

Both the 
[ClearSpring](https://github.com/addthis/stream-lib/blob/master/src/main/java/com/clearspring/analytics/stream/cardinality/HyperLogLog.java)
 and 
[AggregateKnowledge](https://github.com/aggregateknowledge/java-hll/blob/master/src/main/java/net/agkn/hll/HLL.java)
 implementations resort to blackbox testing. I will create some blackbox tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8640] [SQL] Enable Processing of Multip...

2015-07-30 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7515#discussion_r35946650
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -818,7 +818,8 @@ class Analyzer(
   failAnalysis(s$expr has multiple Window Specifications 
($distinctWindowSpec). +
 sPlease file a bug report with this error message, stack 
trace, and the query.)
 } else {
-  distinctWindowSpec.head
+  val unbounded = SpecifiedWindowFrame(RowFrame, 
UnboundedPreceding, UnboundedFollowing)
--- End diff --

I think extracting the grouping by and order by clauses is better, and it 
also conveys the message that any Window operator should be capable of 
processing multiple frames. The impact of the PR will be a bit larger in that 
case; I'll have a look at it tomorrow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9482] [SQL] Fix thread-safey issue of u...

2015-08-05 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7940#discussion_r36343368
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
 ---
@@ -47,7 +47,7 @@ case class BroadcastNestedLoopJoin(
   override def outputsUnsafeRows: Boolean = left.outputsUnsafeRows || 
right.outputsUnsafeRows
   override def canProcessUnsafeRows: Boolean = true
 
-  @transient private[this] lazy val resultProjection: InternalRow = 
InternalRow = {
+  @transient private[this] def genResultProjection: InternalRow = 
InternalRow = {
--- End diff --

```@transient def```? Does that work? Maybe not the most constructive 
feedback, but it stood out...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7712] [SQL] Move Window Functions from ...

2015-07-30 Thread hvanhovell
Github user hvanhovell closed the pull request at:

https://github.com/apache/spark/pull/6278


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7712] [SQL] Move Window Functions from ...

2015-07-30 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/6278#issuecomment-126397148
  
This PR is redundant now. See SPARK-8638 SPARK-8640 and SPARK-8641 for the 
proposed implementation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9729] [SPARK-9363] [WIP] [SQL] Use sort...

2015-08-06 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7904#discussion_r36487283
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
 ---
@@ -56,117 +52,247 @@ case class SortMergeJoin(
   @transient protected lazy val leftKeyGenerator = newProjection(leftKeys, 
left.output)
   @transient protected lazy val rightKeyGenerator = 
newProjection(rightKeys, right.output)
 
+  protected[this] def isUnsafeMode: Boolean = {
+// TODO(josh): there is an existing bug here: this should also check 
whether unsafe mode
+// is enabled. also, the default for self.codegenEnabled looks 
inconsistent to me.
+codegenEnabled  UnsafeProjection.canSupport(leftKeys)  
UnsafeProjection.canSupport(schema)
+  }
+
+  // TODO(josh): this will need to change once we use an Unsafe row joiner
+  override def outputsUnsafeRows: Boolean = false
+  override def canProcessUnsafeRows: Boolean = isUnsafeMode
+  override def canProcessSafeRows: Boolean = !isUnsafeMode
+
   private def requiredOrders(keys: Seq[Expression]): Seq[SortOrder] = {
 // This must be ascending in order to agree with the `keyOrdering` 
defined in `doExecute()`.
 keys.map(SortOrder(_, Ascending))
   }
 
   protected override def doExecute(): RDD[InternalRow] = {
-val leftResults = left.execute().map(_.copy())
-val rightResults = right.execute().map(_.copy())
-
-leftResults.zipPartitions(rightResults) { (leftIter, rightIter) =
+left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) 
=
   new Iterator[InternalRow] {
 // An ordering that can be used to compare keys from both sides.
 private[this] val keyOrdering = 
newNaturalAscendingOrdering(leftKeys.map(_.dataType))
-// Mutable per row objects.
+private[this] var currentLeftRow: InternalRow = _
+private[this] var currentRightMatches: CompactBuffer[InternalRow] 
= _
+private[this] var currentMatchIdx: Int = -1
+private[this] val smjScanner = new SortMergeJoinScanner(
+  leftKeyGenerator,
+  rightKeyGenerator,
+  keyOrdering,
+  leftIter,
+  rightIter
+)
 private[this] val joinRow = new JoinedRow
-private[this] var leftElement: InternalRow = _
-private[this] var rightElement: InternalRow = _
-private[this] var leftKey: InternalRow = _
-private[this] var rightKey: InternalRow = _
-private[this] var rightMatches: CompactBuffer[InternalRow] = _
-private[this] var rightPosition: Int = -1
-private[this] var stop: Boolean = false
-private[this] var matchKey: InternalRow = _
-
-// initialize iterator
-initialize()
-
-override final def hasNext: Boolean = nextMatchingPair()
-
-override final def next(): InternalRow = {
-  if (hasNext) {
-// we are using the buffered right rows and run down left 
iterator
-val joinedRow = joinRow(leftElement, 
rightMatches(rightPosition))
-rightPosition += 1
-if (rightPosition = rightMatches.size) {
-  rightPosition = 0
-  fetchLeft()
-  if (leftElement == null || keyOrdering.compare(leftKey, 
matchKey) != 0) {
-stop = false
-rightMatches = null
-  }
-}
-joinedRow
-  } else {
-// no more result
-throw new NoSuchElementException
-  }
-}
 
-private def fetchLeft() = {
-  if (leftIter.hasNext) {
-leftElement = leftIter.next()
-leftKey = leftKeyGenerator(leftElement)
+override final def hasNext: Boolean =
+  (currentMatchIdx != -1  currentMatchIdx  
currentRightMatches.length) || fetchNext()
+
+private[this] def fetchNext(): Boolean = {
+  if (smjScanner.findNextInnerJoinRows()) {
+currentRightMatches = smjScanner.getBuildMatches
+currentLeftRow = smjScanner.getStreamedRow
+currentMatchIdx = 0
+true
   } else {
-leftElement = null
+currentRightMatches = null
+currentLeftRow = null
+currentMatchIdx = -1
+false
   }
 }
 
-private def fetchRight() = {
-  if (rightIter.hasNext) {
-rightElement = rightIter.next()
-rightKey = rightKeyGenerator(rightElement)
-  } else {
-rightElement

[GitHub] spark pull request: [SPARK-9357][SQL] Remove JoinedRow/Introduce J...

2015-08-04 Thread hvanhovell
GitHub user hvanhovell opened a pull request:

https://github.com/apache/spark/pull/7942

[SPARK-9357][SQL] Remove JoinedRow/Introduce JoinedProjection [WIP]

```JoinedRow```'s are used to join two rows together, and are used a lot of 
the most performance critical sections of Spark. The problem with 
```JoinedRow``` is that it is an extra layer of indirection, and that the 
current code has branches; both are serious performance bottlenecks.

This PR introduces ```JoinedProjection``` and replaces ```JoinedRow``` as 
the primary method of combining two rows. A ```JoinedProjection``` is a 
function that takes a left and a right row as its input, and combines these 
using the given expressions.

```JoinedRow``` cannot be removed because it provides the only way to do 
interpreted joined projections (Expression ```eval``` only takes one row as its 
argument), and because the code generation fallback relies on it.

The current implementation supports the interpreted and code generated 
paths, and has been applied to all aggregate operators in Spark SQL. Other 
operators using ```JoinedRow```, i.e.: *Joins, Generate and PythonUDF, can be 
converted in follow-up PRs.

cc @yhuai @rxin 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hvanhovell/spark SPARK-9357

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/7942.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #7942


commit 4afec8c5d22cc3483e8331193aa52f4f6302b31f
Author: Herman van Hovell hvanhov...@questtec.nl
Date:   2015-08-04T05:56:26Z

WIP - Initial Commit. It compiles. Now make it work.

commit 0f1be99d3467d021829b7654d9efc986fc120fa7
Author: Herman van Hovell hvanhov...@questtec.nl
Date:   2015-08-04T15:51:51Z

Clean-up. Replaced non-joined generate path to two paths. Factored out some 
more expression support.

commit e6c5f076fd4bdeed6cd0b75764b6ba127b9fb84c
Author: Herman van Hovell hvanhov...@questtec.nl
Date:   2015-08-04T16:43:06Z

Removed Joined Row From Aggregate Operators.

commit 05914722bcd0a7508536470c86c1b61628674563
Author: Herman van Hovell hvanhov...@questtec.nl
Date:   2015-08-04T16:47:56Z

Style Fixes.

commit 81f11325eb512aa4fc986d323e16a36a9db85185
Author: Herman van Hovell hvanhov...@questtec.nl
Date:   2015-08-04T17:02:38Z

Non-Branching JoinedRow.

commit 296a073ede6c195ce6a08d9e1f84176d086dfd0c
Author: Herman van Hovell hvanhov...@questtec.nl
Date:   2015-08-04T20:30:42Z

Fix CodeGenFallback path. Bugfixes.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9980][BUILD] Fix SBT publishLocal error...

2015-08-14 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/8209#issuecomment-131206616
  
I have replaced ```p/``` tags by ```p``` in all java files I could find 
them in. I haven't touched the ```BytesToByteMap```, ```Bin``` and 
```PagedTable``` classes because these are written scala and Scaladoc has no 
problem with ``'s. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9980][BUILD] Fix SBT publishLocal error...

2015-08-14 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/8209#issuecomment-131210873
  
I'd rather leave the scala source alone for now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9980][BUILD] Fix SBT publishLocal error...

2015-08-14 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/8209#issuecomment-131198780
  
I'll change those as well. It is strange that these didn't create problems; 
I guess it has something to do with the position of the  in the line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9980][BUILD] Fix SBT publishLocal error...

2015-08-14 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/8209#discussion_r37103311
  
--- Diff: 
launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java ---
@@ -211,10 +211,10 @@ public SparkLauncher addSparkArg(String arg) {
* Adds an argument with a value to the Spark invocation. If the 
argument name corresponds to
* a known argument, the code validates that the argument actually 
expects a value, and throws
* an exception otherwise.
-   * p/
+   * p
--- End diff --

So it turns out that javadoc in java 8 doesn't allow self-closing elements 
(```br/``` and ```p/```) any more:

http://stackoverflow.com/questions/26049329/javadoc-in-jdk-8-invalid-self-closing-element-not-allowed

http://www.oracle.com/technetwork/java/javase/documentation/index-137868.html#format

```p``` is the preferred seperator for a paragraph. So its is not HTML 
but Javadoc we are talking about. Sorry about the confusion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9980][BUILD] Fix SBT publishLocal error...

2015-08-14 Thread hvanhovell
GitHub user hvanhovell opened a pull request:

https://github.com/apache/spark/pull/8209

[SPARK-9980][BUILD] Fix SBT publishLocal error due to invalid characters in 
doc

Tiny modification to a few comments ```sbt publishLocal``` work again.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hvanhovell/spark SPARK-9980

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/8209.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #8209


commit 500fe15c3ef7c6e835d09dc9d9025c58c3467a76
Author: Herman van Hovell hvanhov...@questtec.nl
Date:   2015-08-14T17:18:42Z

Replace  by HTML friendly lt;lt;

commit 4a743c2cb7e6a4aa86e693f6b48e5528334ce5d6
Author: Herman van Hovell hvanhov...@questtec.nl
Date:   2015-08-14T17:33:09Z

Replace self closing p/ statements witt p.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10001] [CORE] Allow Ctrl-C in spark-she...

2015-08-15 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/8216#discussion_r37136449
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala ---
@@ -17,6 +17,10 @@
 
 package org.apache.spark.scheduler
 
+import sun.misc.{Signal, SignalHandler}
--- End diff --

This dependency can cause both Runtime and Compile time problems, if the 
used JVM/compiler does not provide these classes. Does this work with other JVM 
implementations, such as OpenJDK and IBM JDK?

I am in favor of this functionality, but I think we should be very careful 
using platform specific code. Luckily this is not the first time people had to 
solve this problem. JLine solves this in the following way:

https://github.com/jline/jline2/blob/master/src/main/java/jline/console/ConsoleReader.java#L257-L284



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10001] [CORE] Allow Ctrl-C in spark-she...

2015-08-15 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/8216#discussion_r37137085
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala ---
@@ -17,6 +17,10 @@
 
 package org.apache.spark.scheduler
 
+import sun.misc.{Signal, SignalHandler}
--- End diff --

I don't really think there is an easy way around the 
```sun.misc.Signal/SignalHander``` approach. 

I have to admit that my concern is moot if all major implementations 
support this (can anyone confirm this?). I just like the JLine approach better 
because this doesn't cause Compile/Run-time errors when classes cannot be 
found, it will just default to the current behavior.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9740] [SQL] Change the default behavior...

2015-08-11 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/8113#discussion_r36818762
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala
 ---
@@ -130,23 +138,36 @@ case class First(child: Expression) extends 
AlgebraicAggregate {
 
   private val first = AttributeReference(first, child.dataType)()
 
-  override val bufferAttributes = first :: Nil
+  private val valueSet = AttributeReference(valueSet, BooleanType)()
+
+  override val bufferAttributes = first :: valueSet :: Nil
 
   override val initialValues = Seq(
-/* first = */ Literal.create(null, child.dataType)
+/* first = */ Literal.create(null, child.dataType),
+/* valueSet = */ Literal.create(false, BooleanType)
   )
 
   override val updateExpressions = Seq(
-/* first = */ If(IsNull(first), child, first)
+/* first = */ If(valueSet, first, child),
+/* valueSet = */ If(valueSet, valueSet, Literal.create(true, 
BooleanType))
   )
 
   override val mergeExpressions = Seq(
-/* first = */ If(IsNull(first.left), first.right, first.left)
+/* first = */ If(valueSet, first.left, first.right),
+/* valueSet = */ If(valueSet, valueSet, Literal.create(true, 
BooleanType))
--- End diff --

```Or(valueSet.left, valueSet.right)```?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9740] [SQL] Change the default behavior...

2015-08-11 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/8113#discussion_r36818719
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala
 ---
@@ -130,23 +138,36 @@ case class First(child: Expression) extends 
AlgebraicAggregate {
 
   private val first = AttributeReference(first, child.dataType)()
 
-  override val bufferAttributes = first :: Nil
+  private val valueSet = AttributeReference(valueSet, BooleanType)()
+
+  override val bufferAttributes = first :: valueSet :: Nil
 
   override val initialValues = Seq(
-/* first = */ Literal.create(null, child.dataType)
+/* first = */ Literal.create(null, child.dataType),
+/* valueSet = */ Literal.create(false, BooleanType)
   )
 
   override val updateExpressions = Seq(
-/* first = */ If(IsNull(first), child, first)
+/* first = */ If(valueSet, first, child),
+/* valueSet = */ If(valueSet, valueSet, Literal.create(true, 
BooleanType))
--- End diff --

We could for now simplify this to ```Literal.create(true, BooleanType)```.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9740] [SQL] Change the default behavior...

2015-08-11 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/8113#issuecomment-130128926
  
Besides the ```valueSet``` update/merge expressions LGTM.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9740] [SQL] Change the default behavior...

2015-08-11 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/8113#issuecomment-130128263
  
LGTM. One final question, shouldn't we introduce a ```skipNulls``` 
parameter? Or do you want to address this in a follow-up PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9740] [SPARK-9592] [SQL] Change the def...

2015-08-12 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/8113#discussion_r36878009
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala
 ---
@@ -130,24 +147,67 @@ case class First(child: Expression) extends 
AlgebraicAggregate {
 
   private val first = AttributeReference(first, child.dataType)()
 
-  override val bufferAttributes = first :: Nil
+  private val valueSet = AttributeReference(valueSet, BooleanType)()
+
+  override val bufferAttributes = first :: valueSet :: Nil
 
   override val initialValues = Seq(
-/* first = */ Literal.create(null, child.dataType)
+/* first = */ Literal.create(null, child.dataType),
+/* valueSet = */ Literal.create(false, BooleanType)
   )
 
-  override val updateExpressions = Seq(
-/* first = */ If(IsNull(first), child, first)
-  )
+  override val updateExpressions = {
+val litTrue = Literal.create(true, BooleanType)
+if (ignoreNulls) {
+  Seq(
+/* first = */ If(Or(valueSet, IsNull(child)), first, child),
+/* valueSet = */ If(Or(valueSet, IsNull(child)), valueSet, litTrue)
+  )
+} else {
+  Seq(
+/* first = */ If(valueSet, first, child),
+/* valueSet = */ litTrue
+  )
+}
+  }
 
-  override val mergeExpressions = Seq(
-/* first = */ If(IsNull(first.left), first.right, first.left)
-  )
+  override val mergeExpressions = {
+val litTrue = Literal.create(true, BooleanType)
+if (ignoreNulls) {
+  Seq(
+/* first = */ If(Or(valueSet.left, IsNull(first.right)), 
first.left, first.right),
--- End diff --

I don't think we need to check ```IsNull(first.right)```.

On a more general note: I think we don't need to separate the 
```ignoreNulls = false``` and ```ignoreNulls = true``` paths. This should be 
fine for both:
```
/* first = */ If(valueSet.left, first.left, first.right),
/* valueSet = */ Or(valueSet.left, valueSet.right)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9740] [SPARK-9592] [SQL] Change the def...

2015-08-12 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/8113#discussion_r36877100
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala
 ---
@@ -130,24 +147,67 @@ case class First(child: Expression) extends 
AlgebraicAggregate {
 
   private val first = AttributeReference(first, child.dataType)()
 
-  override val bufferAttributes = first :: Nil
+  private val valueSet = AttributeReference(valueSet, BooleanType)()
+
+  override val bufferAttributes = first :: valueSet :: Nil
 
   override val initialValues = Seq(
-/* first = */ Literal.create(null, child.dataType)
+/* first = */ Literal.create(null, child.dataType),
+/* valueSet = */ Literal.create(false, BooleanType)
   )
 
-  override val updateExpressions = Seq(
-/* first = */ If(IsNull(first), child, first)
-  )
+  override val updateExpressions = {
+val litTrue = Literal.create(true, BooleanType)
+if (ignoreNulls) {
+  Seq(
+/* first = */ If(Or(valueSet, IsNull(child)), first, child),
+/* valueSet = */ If(Or(valueSet, IsNull(child)), valueSet, litTrue)
+  )
+} else {
+  Seq(
+/* first = */ If(valueSet, first, child),
+/* valueSet = */ litTrue
+  )
+}
+  }
 
-  override val mergeExpressions = Seq(
-/* first = */ If(IsNull(first.left), first.right, first.left)
-  )
+  override val mergeExpressions = {
+val litTrue = Literal.create(true, BooleanType)
+if (ignoreNulls) {
+  Seq(
+/* first = */ If(Or(valueSet.left, IsNull(first.right)), 
first.left, first.right),
+/* valueSet = */ If(Or(valueSet.left, IsNull(first.right)), 
valueSet.left, litTrue)
--- End diff --

Nit: ```Or(valueSet.left. valueSet.right)``` is shorter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8641][SPARK-8712][SQL] Native Spark Win...

2015-07-27 Thread hvanhovell
GitHub user hvanhovell opened a pull request:

https://github.com/apache/spark/pull/7715

[SPARK-8641][SPARK-8712][SQL] Native Spark Window Functions [WIP]

This replaces the Hive UDAFs with native Spark SQL UDAFs using the new UDAF 
interface. See the JIRA ticket for more information.

This is currently a work in progress. I have created the PR in order to get 
feedback. There are still a number of open issues:
* Remove the Hive Window Function and old Window Function code path. This 
is completely replaced by the new interface.
* Improve the check analysis messages.
* A lot of testing: the Distinct aggregation path, Error messages.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hvanhovell/spark SPARK-8641

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/7715.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #7715


commit 73ad5cb4a70b07656431aeec0b8ec8cb938aba81
Author: Herman van Hovell hvanhov...@questtec.nl
Date:   2015-07-22T22:06:30Z

Added WindowFunctions.

commit 160987d3808ce6d44b46660bb641962d5732754d
Author: Herman van Hovell hvanhov...@questtec.nl
Date:   2015-07-26T04:33:47Z

Move to Native Spark UDAFs for window processing.

commit 55aed3cf827326388c90262c0fb81f5b5bd83f19
Author: Herman van Hovell hvanhov...@questtec.nl
Date:   2015-07-26T04:45:33Z

Rebase  make it compile again.

commit 52c6319b5acdf838b997154b744595676e317668
Author: Herman van Hovell hvanhov...@questtec.nl
Date:   2015-07-26T16:56:36Z

Bug fixes

commit abba35fabb72fd0a93b8c86fc8a723b02cb6c9fb
Author: Herman van Hovell hvanhov...@questtec.nl
Date:   2015-07-27T19:30:47Z

Add NoOp to interpreted projections.

commit fedb164b97115a2481e9e74c94c1cf371a7246fe
Author: Herman van Hovell hvanhov...@questtec.nl
Date:   2015-07-28T03:52:33Z

Bugfixes... All tests work now.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8641][SPARK-8712][SQL] Native Spark Win...

2015-07-27 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/7715#issuecomment-125431976
  
cc @yhuai


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9066][SQL] Improve cartesian performanc...

2015-07-24 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7417#discussion_r35433399
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -213,10 +213,51 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
   object CartesianProduct extends Strategy {
 def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
   case logical.Join(left, right, _, None) =
-execution.joins.CartesianProduct(planLater(left), 
planLater(right)) :: Nil
+// For BroadcastCartesianProduct we will broadcast the small size 
plan,
+// for CartesianProduct we will use the small size plan as 
cartesian left rdd.
+if (right.statistics.sizeInBytes = left.statistics.sizeInBytes) {
+  if (sqlContext.conf.autoBroadcastJoinThreshold  0 
+right.statistics.sizeInBytes = 
sqlContext.conf.autoBroadcastJoinThreshold) {
--- End diff --

You can use ```CanBroadcast``` extractor in order to determine that a side 
is Broadcastable, this will also consider the ```broadcast``` hint, and is 
probably more future proof.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9066][SQL] Improve cartesian performanc...

2015-07-24 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/7417#issuecomment-124562458
  
@Sephiroth-Lin The performance improvement sounds really good. It seems 
like a good thing to put in Spark.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9066][SQL] Improve cartesian performanc...

2015-07-24 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7417#discussion_r35433038
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -213,10 +213,51 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
   object CartesianProduct extends Strategy {
 def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
   case logical.Join(left, right, _, None) =
-execution.joins.CartesianProduct(planLater(left), 
planLater(right)) :: Nil
+// For BroadcastCartesianProduct we will broadcast the small size 
plan,
+// for CartesianProduct we will use the small size plan as 
cartesian left rdd.
+if (right.statistics.sizeInBytes = left.statistics.sizeInBytes) {
+  if (sqlContext.conf.autoBroadcastJoinThreshold  0 
+right.statistics.sizeInBytes = 
sqlContext.conf.autoBroadcastJoinThreshold) {
+execution.joins.BroadcastCartesianProduct(planLater(left), 
planLater(right),
+  joins.BuildRight) :: Nil
+  } else {
+execution.joins.CartesianProduct(planLater(left), 
planLater(right),
+  joins.BuildLeft) :: Nil
+  }
+} else {
+  if (sqlContext.conf.autoBroadcastJoinThreshold  0 
+left.statistics.sizeInBytes = 
sqlContext.conf.autoBroadcastJoinThreshold) {
+execution.joins.BroadcastCartesianProduct(planLater(left), 
planLater(right),
+  joins.BuildLeft) :: Nil
+  } else {
+execution.joins.CartesianProduct(planLater(left), 
planLater(right),
+  joins.BuildRight) :: Nil
+  }
+}
   case logical.Join(left, right, Inner, Some(condition)) =
-execution.Filter(condition,
-  execution.joins.CartesianProduct(planLater(left), 
planLater(right))) :: Nil
+if (right.statistics.sizeInBytes = left.statistics.sizeInBytes) {
--- End diff --

This code is almost the same as the code above. I would put it in a method 
i.e. ```createCartesianProduct```, and wrap the result in a ```Filter``` 
operator. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-16 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34844383
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -38,443 +84,661 @@ case class Window(
 child: SparkPlan)
   extends UnaryNode {
 
-  override def output: Seq[Attribute] =
-(projectList ++ windowExpression).map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
 if (windowSpec.partitionSpec.isEmpty) {
-  // This operator will be very expensive.
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning(No Partition Defined for Window operation! Moving all 
data to a single 
++ partition, this can cause serious performance degradation.)
   AllTuples :: Nil
-} else {
-  ClusteredDistribution(windowSpec.partitionSpec) :: Nil
-}
-
-  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
-  // is preserved.
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-
-  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
-// The required child ordering has two parts.
-// The first part is the expressions in the partition specification.
-// We add these expressions to the required ordering to make sure 
input rows are grouped
-// based on the partition specification. So, we only need to process a 
single partition
-// at a time.
-// The second part is the expressions specified in the ORDER BY cluase.
-// Basically, we first use sort to group rows based on partition 
specifications and then sort
-// Rows in a group based on the order specification.
-(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec) :: Nil
+} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil
   }
 
-  // Since window functions basically add columns to input rows, this 
operator
-  // will not change the ordering of input rows.
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec)
+
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  case class ComputedWindow(
-unbound: WindowExpression,
-windowFunction: WindowFunction,
-resultAttribute: AttributeReference)
-
-  // A list of window functions that need to be computed for each group.
-  private[this] val computedWindowExpressions = windowExpression.flatMap { 
window =
-window.collect {
-  case w: WindowExpression =
-ComputedWindow(
-  w,
-  BindReferences.bindReference(w.windowFunction, child.output),
-  AttributeReference(swindowResult:$w, w.dataType, w.nullable)())
+  /**
+   * Create a bound ordering object for a given frame type and offset. A 
bound ordering object is
+   * used to determine which input row lies within the frame boundaries of 
an output row.
+   *
+   * This method uses Code Generation. It can only be used on the executor 
side.
+   *
+   * @param frameType to evaluate. This can either be Row or Range based.
+   * @param offset with respect to the row.
+   * @return a bound ordering object.
+   */
+  private[this] def createBoundOrdering(frameType: FrameType, offset: 
Int): BoundOrdering = {
+frameType match {
+  case RangeFrame =
+val (exprs, current, bound) = if (offset == 0) {
+  // Use the entire order expression when the offset is 0.
+  val exprs = windowSpec.orderSpec.map(_.child)
+  val projection = newMutableProjection(exprs, child.output)
+  (windowSpec.orderSpec, projection(), projection())
+}
+else if (windowSpec.orderSpec.size == 1) {
+  // Use only the first order expression when the offset is 
non-null.
+  val sortExpr = windowSpec.orderSpec.head
+  val expr = sortExpr.child
+  // Create the projection which returns the current 'value'.
+  val current = newMutableProjection(expr :: Nil, child.output)()
+  // Create the projection which returns the current 'value' 
modified by adding the offset.
+  val boundExpr = Add(expr, Cast(Literal.create(offset, 
IntegerType), expr.dataType))
+  val bound = newMutableProjection(boundExpr :: Nil, 
child.output)()
+  (sortExpr :: Nil, current, bound)
+}
+else {
+  sys.error(Non

[GitHub] spark pull request: [SPARK-8682][SQL][WIP] Range Join

2015-07-17 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7379#discussion_r34891100
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastRangeJoin.scala
 ---
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.joins
+
+import org.apache.spark.sql.catalyst.util.TypeUtils
+import org.apache.spark.util.ThreadUtils
+
+import scala.annotation.tailrec
+import scala.collection.mutable
+import scala.concurrent._
+import scala.concurrent.duration._
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+
+/**
+ * Performs an inner range join on two tables. A range join typically has 
the following form:
+ *
+ * SELECT A.*
+ *,B.*
+ * FROM   tableA A
+ *JOIN tableB B
+ * ON A.start = B.end
+ *  AND A.end  B.start
+ *
+ * The implementation builds a range index from the smaller build side, 
broadcasts this index
+ * to all executors. The streaming side is then matched against the index. 
This reduces the number
+ * of comparisons made by log(n) (n is the number of records in the build 
table) over the
+ * typical solution (Nested Loop Join).
+ *
+ * TODO NaN values
+ * TODO NULL values
+ * TODO Outer joins? StreamSide is quite easy/BuildSide requires 
bookkeeping and
+ * TODO This join will maintain sort order. The build side rows will also 
be added in a lower
+ *  bound sorted fashion.
+ */
+@DeveloperApi
+case class BroadcastRangeJoin(
+leftKeys: Seq[Expression],
+rightKeys: Seq[Expression],
+equality: Seq[Boolean],
+buildSide: BuildSide,
+left: SparkPlan,
+right: SparkPlan)
+  extends BinaryNode {
+
+  private[this] lazy val (buildPlan, streamedPlan) = buildSide match {
+case BuildLeft = (left, right)
+case BuildRight = (right, left)
+  }
+
+  private[this] lazy val (buildKeys, streamedKeys) = buildSide match {
+case BuildLeft = (leftKeys, rightKeys)
+case BuildRight = (rightKeys, leftKeys)
+  }
+
+  override def output: Seq[Attribute] = left.output ++ right.output
+
+  @transient
+  private[this] lazy val buildSideKeyGenerator: Projection =
+newProjection(buildKeys, buildPlan.output)
+
+  @transient
+  private[this] lazy val streamSideKeyGenerator: () = MutableProjection =
+newMutableProjection(streamedKeys, streamedPlan.output)
+
+  private[this] val timeout: Duration = {
+val timeoutValue = sqlContext.conf.broadcastTimeout
+if (timeoutValue  0) {
+  Duration.Inf
+} else {
+  timeoutValue.seconds
+}
+  }
+
+  // Construct the range index.
+  @transient
+  private[this] val indexBroadcastFuture = future {
+// Deal with equality.
+val Seq(allowLowEqual: Boolean, allowHighEqual: Boolean) = buildSide 
match {
+  case BuildLeft = equality.reverse
+  case BuildRight = equality
+}
+
+// Get the ordering for the datatype.
+val ordering = TypeUtils.getOrdering(buildKeys.head.dataType)
+
+// Note that we use .execute().collect() because we don't want to 
convert data to Scala types
+// TODO find out if the result of a sort and a collect is still sorted.
+val eventifier = RangeIndex.toRangeEvent(buildSideKeyGenerator, 
ordering)
+val events = 
buildPlan.execute().map(_.copy()).collect().flatMap(eventifier)
+
+// Create the index.
+val index = RangeIndex.build(ordering, events, allowLowEqual, 
allowHighEqual)
+
+// Broadcast the index.
+sparkContext.broadcast(index

[GitHub] spark pull request: [SPARK-9066][SQL] Improve cartesian performanc...

2015-07-24 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7417#discussion_r35422390
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastCartesianProduct.scala
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.joins
+
+import scala.concurrent._
+import scala.concurrent.duration._
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, JoinedRow}
+import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * :: DeveloperApi ::
+ */
+@DeveloperApi
+case class BroadcastCartesianProduct(
--- End diff --

The inner join variant with (degenerate) condition ```1 = 1``` would do the 
same.

All I am saying is that this also a way to get a broadcasting cartesian 
join going, and it saves some lines of code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8682][SQL][WIP] Range Join

2015-07-14 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/7379#issuecomment-121250819
  
Current test errors are a bit weird. They shouldn't have been caused by 
this change, because the functionality is disabled by default.

Rebased to most recent master. See if this helps.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-14 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34602077
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -37,443 +59,622 @@ case class Window(
 child: SparkPlan)
   extends UnaryNode {
 
-  override def output: Seq[Attribute] =
-(projectList ++ windowExpression).map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
 if (windowSpec.partitionSpec.isEmpty) {
-  // This operator will be very expensive.
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning(No Partition Defined for Window operation! Moving all 
data to a single 
++ partition, this can cause serious performance degradation.)
   AllTuples :: Nil
-} else {
-  ClusteredDistribution(windowSpec.partitionSpec) :: Nil
-}
-
-  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
-  // is preserved.
-  override def outputPartitioning: Partitioning = child.outputPartitioning
--- End diff --

Ahhh... Now I understand. I got confused with ```outputOrdering```, sorry 
about that. I'll add it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-14 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34603151
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -37,443 +59,622 @@ case class Window(
 child: SparkPlan)
   extends UnaryNode {
 
-  override def output: Seq[Attribute] =
-(projectList ++ windowExpression).map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
 if (windowSpec.partitionSpec.isEmpty) {
-  // This operator will be very expensive.
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning(No Partition Defined for Window operation! Moving all 
data to a single 
++ partition, this can cause serious performance degradation.)
   AllTuples :: Nil
-} else {
-  ClusteredDistribution(windowSpec.partitionSpec) :: Nil
-}
-
-  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
-  // is preserved.
-  override def outputPartitioning: Partitioning = child.outputPartitioning
--- End diff --

Tiny follow-up. Now I know why I have removed it: 
```org.apache.spark.sql.execution.UnaryNode```, the parent class of 
```Window```, already implements this in exactly the same way:
```
private[sql] trait UnaryNode extends SparkPlan with 
trees.UnaryNode[SparkPlan] {
  self: Product =
  override def outputPartitioning: Partitioning = child.outputPartitioning
}
```
So do I still need to add this to the ```Window``` class?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-14 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34616177
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -38,443 +68,645 @@ case class Window(
 child: SparkPlan)
   extends UnaryNode {
 
-  override def output: Seq[Attribute] =
-(projectList ++ windowExpression).map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
 if (windowSpec.partitionSpec.isEmpty) {
-  // This operator will be very expensive.
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning(No Partition Defined for Window operation! Moving all 
data to a single 
++ partition, this can cause serious performance degradation.)
   AllTuples :: Nil
-} else {
-  ClusteredDistribution(windowSpec.partitionSpec) :: Nil
-}
-
-  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
-  // is preserved.
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-
-  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
-// The required child ordering has two parts.
-// The first part is the expressions in the partition specification.
-// We add these expressions to the required ordering to make sure 
input rows are grouped
-// based on the partition specification. So, we only need to process a 
single partition
-// at a time.
-// The second part is the expressions specified in the ORDER BY cluase.
-// Basically, we first use sort to group rows based on partition 
specifications and then sort
-// Rows in a group based on the order specification.
-(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec) :: Nil
+} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil
   }
 
-  // Since window functions basically add columns to input rows, this 
operator
-  // will not change the ordering of input rows.
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec)
 
-  case class ComputedWindow(
-unbound: WindowExpression,
-windowFunction: WindowFunction,
-resultAttribute: AttributeReference)
-
-  // A list of window functions that need to be computed for each group.
-  private[this] val computedWindowExpressions = windowExpression.flatMap { 
window =
-window.collect {
-  case w: WindowExpression =
-ComputedWindow(
-  w,
-  BindReferences.bindReference(w.windowFunction, child.output),
-  AttributeReference(swindowResult:$w, w.dataType, w.nullable)())
-}
-  }.toArray
-
-  private[this] val windowFrame =
-windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  // Create window functions.
-  private[this] def windowFunctions(): Array[WindowFunction] = {
-val functions = new 
Array[WindowFunction](computedWindowExpressions.length)
-var i = 0
-while (i  computedWindowExpressions.length) {
-  functions(i) = 
computedWindowExpressions(i).windowFunction.newInstance()
-  functions(i).init()
-  i += 1
+  /**
+   * Create a bound ordering object for a given frame type and offset. A 
bound ordering object is
+   * used to determine which input row lies within the frame boundaries of 
an output row. There
+   * are two types of boundaries that can be evaluated:
+   * - Row Based: A row based boundary is based on the position of the row 
within the partition.
+   *   The offset indicates the number of rows above or below the current 
row, the frame for the
+   *   current row starts or ends. For instance, given a row based sliding 
frame with a lower bound
+   *   offset of -1 and a upper bound offset of +2. The frame for row with 
index 5 would range from
+   *   index 4 to index 6.
+   * - Range based: A range based boundary is based on the actual value of 
the ORDER BY
+   *   expression(s). The offset is used to alter the value of the ORDER 
BY expression, for
+   *   instance if the current order by expression has a value of 10 and 
the lower bound offset
+   *   is -3, the resulting lower bound for the current row will be 10 - 3 
= 7. This however puts a
+   *   number of constraints on the ORDER

[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-14 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34617761
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -38,443 +68,645 @@ case class Window(
 child: SparkPlan)
   extends UnaryNode {
 
-  override def output: Seq[Attribute] =
-(projectList ++ windowExpression).map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
 if (windowSpec.partitionSpec.isEmpty) {
-  // This operator will be very expensive.
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning(No Partition Defined for Window operation! Moving all 
data to a single 
++ partition, this can cause serious performance degradation.)
   AllTuples :: Nil
-} else {
-  ClusteredDistribution(windowSpec.partitionSpec) :: Nil
-}
-
-  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
-  // is preserved.
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-
-  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
-// The required child ordering has two parts.
-// The first part is the expressions in the partition specification.
-// We add these expressions to the required ordering to make sure 
input rows are grouped
-// based on the partition specification. So, we only need to process a 
single partition
-// at a time.
-// The second part is the expressions specified in the ORDER BY cluase.
-// Basically, we first use sort to group rows based on partition 
specifications and then sort
-// Rows in a group based on the order specification.
-(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec) :: Nil
+} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil
   }
 
-  // Since window functions basically add columns to input rows, this 
operator
-  // will not change the ordering of input rows.
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec)
 
-  case class ComputedWindow(
-unbound: WindowExpression,
-windowFunction: WindowFunction,
-resultAttribute: AttributeReference)
-
-  // A list of window functions that need to be computed for each group.
-  private[this] val computedWindowExpressions = windowExpression.flatMap { 
window =
-window.collect {
-  case w: WindowExpression =
-ComputedWindow(
-  w,
-  BindReferences.bindReference(w.windowFunction, child.output),
-  AttributeReference(swindowResult:$w, w.dataType, w.nullable)())
-}
-  }.toArray
-
-  private[this] val windowFrame =
-windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  // Create window functions.
-  private[this] def windowFunctions(): Array[WindowFunction] = {
-val functions = new 
Array[WindowFunction](computedWindowExpressions.length)
-var i = 0
-while (i  computedWindowExpressions.length) {
-  functions(i) = 
computedWindowExpressions(i).windowFunction.newInstance()
-  functions(i).init()
-  i += 1
+  /**
+   * Create a bound ordering object for a given frame type and offset. A 
bound ordering object is
+   * used to determine which input row lies within the frame boundaries of 
an output row. There
+   * are two types of boundaries that can be evaluated:
+   * - Row Based: A row based boundary is based on the position of the row 
within the partition.
+   *   The offset indicates the number of rows above or below the current 
row, the frame for the
+   *   current row starts or ends. For instance, given a row based sliding 
frame with a lower bound
+   *   offset of -1 and a upper bound offset of +2. The frame for row with 
index 5 would range from
+   *   index 4 to index 6.
+   * - Range based: A range based boundary is based on the actual value of 
the ORDER BY
+   *   expression(s). The offset is used to alter the value of the ORDER 
BY expression, for
+   *   instance if the current order by expression has a value of 10 and 
the lower bound offset
+   *   is -3, the resulting lower bound for the current row will be 10 - 3 
= 7. This however puts a
+   *   number of constraints on the ORDER

[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-14 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34618909
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -38,443 +68,645 @@ case class Window(
 child: SparkPlan)
   extends UnaryNode {
 
-  override def output: Seq[Attribute] =
-(projectList ++ windowExpression).map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
 if (windowSpec.partitionSpec.isEmpty) {
-  // This operator will be very expensive.
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning(No Partition Defined for Window operation! Moving all 
data to a single 
++ partition, this can cause serious performance degradation.)
   AllTuples :: Nil
-} else {
-  ClusteredDistribution(windowSpec.partitionSpec) :: Nil
-}
-
-  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
-  // is preserved.
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-
-  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
-// The required child ordering has two parts.
-// The first part is the expressions in the partition specification.
-// We add these expressions to the required ordering to make sure 
input rows are grouped
-// based on the partition specification. So, we only need to process a 
single partition
-// at a time.
-// The second part is the expressions specified in the ORDER BY cluase.
-// Basically, we first use sort to group rows based on partition 
specifications and then sort
-// Rows in a group based on the order specification.
-(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec) :: Nil
+} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil
   }
 
-  // Since window functions basically add columns to input rows, this 
operator
-  // will not change the ordering of input rows.
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec)
 
-  case class ComputedWindow(
-unbound: WindowExpression,
-windowFunction: WindowFunction,
-resultAttribute: AttributeReference)
-
-  // A list of window functions that need to be computed for each group.
-  private[this] val computedWindowExpressions = windowExpression.flatMap { 
window =
-window.collect {
-  case w: WindowExpression =
-ComputedWindow(
-  w,
-  BindReferences.bindReference(w.windowFunction, child.output),
-  AttributeReference(swindowResult:$w, w.dataType, w.nullable)())
-}
-  }.toArray
-
-  private[this] val windowFrame =
-windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  // Create window functions.
-  private[this] def windowFunctions(): Array[WindowFunction] = {
-val functions = new 
Array[WindowFunction](computedWindowExpressions.length)
-var i = 0
-while (i  computedWindowExpressions.length) {
-  functions(i) = 
computedWindowExpressions(i).windowFunction.newInstance()
-  functions(i).init()
-  i += 1
+  /**
+   * Create a bound ordering object for a given frame type and offset. A 
bound ordering object is
+   * used to determine which input row lies within the frame boundaries of 
an output row. There
+   * are two types of boundaries that can be evaluated:
+   * - Row Based: A row based boundary is based on the position of the row 
within the partition.
+   *   The offset indicates the number of rows above or below the current 
row, the frame for the
+   *   current row starts or ends. For instance, given a row based sliding 
frame with a lower bound
+   *   offset of -1 and a upper bound offset of +2. The frame for row with 
index 5 would range from
+   *   index 4 to index 6.
+   * - Range based: A range based boundary is based on the actual value of 
the ORDER BY
+   *   expression(s). The offset is used to alter the value of the ORDER 
BY expression, for
+   *   instance if the current order by expression has a value of 10 and 
the lower bound offset
+   *   is -3, the resulting lower bound for the current row will be 10 - 3 
= 7. This however puts a
+   *   number of constraints on the ORDER

[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-16 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34857571
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowSuite.scala 
---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.sql.{Row, QueryTest}
+import org.apache.spark.sql.expressions.Window
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.hive.test.TestHive.implicits._
+
+/**
+ * Window expressions are tested extensively by the following test suites:
+ * [[org.apache.spark.sql.hive.HiveDataFrameWindowSuite]]
+ * 
[[org.apache.spark.sql.hive.execution.HiveWindowFunctionQueryWithoutCodeGenSuite]]
+ * 
[[org.apache.spark.sql.hive.execution.HiveWindowFunctionQueryFileWithoutCodeGenSuite]]
+ * However these suites do not cover all possible (i.e. more exotic) 
settings. This suite fill
+ * this gap.
+ *
+ * TODO Move this class to the sql/core project when we move to Native 
Spark UDAFs.
+ */
+class WindowSuite extends QueryTest {
+
+  test(reverse sliding range frame) {
+val df = Seq(
+  (1, Thin, Cell Phone, 6000),
+  (2, Normal, Tablet, 1500),
+  (3, Mini, Tablet, 5500),
+  (4, Ultra thin, Cell Phone, 5500),
+  (5, Very thin, Cell Phone, 6000),
+  (6, Big, Tablet, 2500),
+  (7, Bendable, Cell Phone, 3000),
+  (8, Foldable, Cell Phone, 3000),
+  (9, Pro, Tablet, 4500),
+  (10, Pro2, Tablet, 6500)).
+  toDF(id, product, category, revenue)
+checkAnswer(
+  df.select(
+$id,
+avg($revenue).over(Window.
+  partitionBy($category).
+  orderBy($revenue.desc).
+  rangeBetween(-2000L, 1000L)).
+  cast(int)),
+Row(1, 5833) :: Row(2, 2000) :: Row(3, 5500) ::
+  Row(4, 5833) :: Row(5, 5833) :: Row(6, 2000) ::
+  Row(7, 3000) :: Row(8, 3000) :: Row(9, 4166) ::
+  Row(10, 5500) :: Nil)
--- End diff --

Yeah that was a mistake on my behalf. I didn't realise that offsets should 
be flipped on when the order is descending (I swapped the high and low offsets).

I pushed a fix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-16 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-122158569
  
@yhuai the benchmarking results are attached. It might be interesting to 
see how the operator performs on different datasets.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8682][SQL][WIP] Range Join

2015-07-16 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/7379#issuecomment-122177553
  
The = case is quite easy to implement.

This implementation is currently targetted at range joining a rather small 
(broadcastable) to an arbitrarily large table. I don't think this matches the 
use case of SMJ: i.e. equi joining arbitrarily large tables. But I might be 
missing something?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8682][SQL][WIP] Range Join

2015-07-16 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7379#discussion_r34862439
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastRangeJoin.scala
 ---
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.joins
+
+import org.apache.spark.sql.catalyst.util.TypeUtils
+import org.apache.spark.util.ThreadUtils
+
+import scala.annotation.tailrec
+import scala.collection.mutable
+import scala.concurrent._
+import scala.concurrent.duration._
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+
+/**
+ * Performs an inner range join on two tables. A range join typically has 
the following form:
+ *
+ * SELECT A.*
+ *,B.*
+ * FROM   tableA A
+ *JOIN tableB B
+ * ON A.start = B.end
+ *  AND A.end  B.start
+ *
+ * The implementation builds a range index from the smaller build side, 
broadcasts this index
+ * to all executors. The streaming side is then matched against the index. 
This reduces the number
+ * of comparisons made by log(n) (n is the number of records in the build 
table) over the
+ * typical solution (Nested Loop Join).
+ *
+ * TODO NaN values
+ * TODO NULL values
+ * TODO Outer joins? StreamSide is quite easy/BuildSide requires 
bookkeeping and
+ * TODO This join will maintain sort order. The build side rows will also 
be added in a lower
+ *  bound sorted fashion.
+ */
+@DeveloperApi
+case class BroadcastRangeJoin(
+leftKeys: Seq[Expression],
+rightKeys: Seq[Expression],
+equality: Seq[Boolean],
+buildSide: BuildSide,
+left: SparkPlan,
+right: SparkPlan)
+  extends BinaryNode {
+
+  private[this] lazy val (buildPlan, streamedPlan) = buildSide match {
+case BuildLeft = (left, right)
+case BuildRight = (right, left)
+  }
+
+  private[this] lazy val (buildKeys, streamedKeys) = buildSide match {
+case BuildLeft = (leftKeys, rightKeys)
+case BuildRight = (rightKeys, leftKeys)
+  }
+
+  override def output: Seq[Attribute] = left.output ++ right.output
+
+  @transient
+  private[this] lazy val buildSideKeyGenerator: Projection =
+newProjection(buildKeys, buildPlan.output)
+
+  @transient
+  private[this] lazy val streamSideKeyGenerator: () = MutableProjection =
+newMutableProjection(streamedKeys, streamedPlan.output)
+
+  private[this] val timeout: Duration = {
+val timeoutValue = sqlContext.conf.broadcastTimeout
+if (timeoutValue  0) {
+  Duration.Inf
+} else {
+  timeoutValue.seconds
+}
+  }
+
+  // Construct the range index.
+  @transient
+  private[this] val indexBroadcastFuture = future {
+// Deal with equality.
+val Seq(allowLowEqual: Boolean, allowHighEqual: Boolean) = buildSide 
match {
+  case BuildLeft = equality.reverse
+  case BuildRight = equality
+}
+
+// Get the ordering for the datatype.
+val ordering = TypeUtils.getOrdering(buildKeys.head.dataType)
+
+// Note that we use .execute().collect() because we don't want to 
convert data to Scala types
+// TODO find out if the result of a sort and a collect is still sorted.
+val eventifier = RangeIndex.toRangeEvent(buildSideKeyGenerator, 
ordering)
+val events = 
buildPlan.execute().map(_.copy()).collect().flatMap(eventifier)
+
+// Create the index.
+val index = RangeIndex.build(ordering, events, allowLowEqual, 
allowHighEqual)
+
+// Broadcast the index.
+sparkContext.broadcast(index

[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-16 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34844502
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -38,443 +68,645 @@ case class Window(
 child: SparkPlan)
   extends UnaryNode {
 
-  override def output: Seq[Attribute] =
-(projectList ++ windowExpression).map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
 if (windowSpec.partitionSpec.isEmpty) {
-  // This operator will be very expensive.
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning(No Partition Defined for Window operation! Moving all 
data to a single 
++ partition, this can cause serious performance degradation.)
   AllTuples :: Nil
-} else {
-  ClusteredDistribution(windowSpec.partitionSpec) :: Nil
-}
-
-  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
-  // is preserved.
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-
-  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
-// The required child ordering has two parts.
-// The first part is the expressions in the partition specification.
-// We add these expressions to the required ordering to make sure 
input rows are grouped
-// based on the partition specification. So, we only need to process a 
single partition
-// at a time.
-// The second part is the expressions specified in the ORDER BY cluase.
-// Basically, we first use sort to group rows based on partition 
specifications and then sort
-// Rows in a group based on the order specification.
-(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec) :: Nil
+} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil
   }
 
-  // Since window functions basically add columns to input rows, this 
operator
-  // will not change the ordering of input rows.
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec)
 
-  case class ComputedWindow(
-unbound: WindowExpression,
-windowFunction: WindowFunction,
-resultAttribute: AttributeReference)
-
-  // A list of window functions that need to be computed for each group.
-  private[this] val computedWindowExpressions = windowExpression.flatMap { 
window =
-window.collect {
-  case w: WindowExpression =
-ComputedWindow(
-  w,
-  BindReferences.bindReference(w.windowFunction, child.output),
-  AttributeReference(swindowResult:$w, w.dataType, w.nullable)())
-}
-  }.toArray
-
-  private[this] val windowFrame =
-windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  // Create window functions.
-  private[this] def windowFunctions(): Array[WindowFunction] = {
-val functions = new 
Array[WindowFunction](computedWindowExpressions.length)
-var i = 0
-while (i  computedWindowExpressions.length) {
-  functions(i) = 
computedWindowExpressions(i).windowFunction.newInstance()
-  functions(i).init()
-  i += 1
+  /**
+   * Create a bound ordering object for a given frame type and offset. A 
bound ordering object is
+   * used to determine which input row lies within the frame boundaries of 
an output row. There
+   * are two types of boundaries that can be evaluated:
+   * - Row Based: A row based boundary is based on the position of the row 
within the partition.
+   *   The offset indicates the number of rows above or below the current 
row, the frame for the
+   *   current row starts or ends. For instance, given a row based sliding 
frame with a lower bound
+   *   offset of -1 and a upper bound offset of +2. The frame for row with 
index 5 would range from
+   *   index 4 to index 6.
+   * - Range based: A range based boundary is based on the actual value of 
the ORDER BY
+   *   expression(s). The offset is used to alter the value of the ORDER 
BY expression, for
+   *   instance if the current order by expression has a value of 10 and 
the lower bound offset
+   *   is -3, the resulting lower bound for the current row will be 10 - 3 
= 7. This however puts a
+   *   number of constraints on the ORDER

[GitHub] spark pull request: [SPARK-8682][SQL][WIP] Range Join

2015-07-17 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/7379#issuecomment-12257
  
No problem.

### Supporting N-Ary Predicates.
In order to make the range join work we need the predicates to define a 
single interval for each side of the join. For instance the clause: ```a.low  
b.high  b.low  a.high``` implies that there are two intervals: [a.low, 
a.high]  [b.low, b.high]. An open interval, for instance ```a.low  b.high```, 
would also work. 

When we use more than two clauses, we can potentially have multiple 
intervals, in your example for instance ```a.key  b.key and a.key2  b.key2 
and a.key3=b.key3``` would yield the following intervals: [a.key1, a.key2], 
[a.key1, a.key3], [b.key2, b.key1]  [b.key2, b.key3]. Creating a working 
index, that can deal with the (partially) uncorrelated intervals, will be quite 
a challenge (I haven't really looked into this yet). We could offcourse pick 
join on one pair of intervals and use filtering to take of the rest.

I think the Unary and Binary cases are the most common. Let's start there, 
and see if there is demand for N-ary designs.

### Generalization
If you consider the fact that we are joining intervals (Ranges if you 
will), range partitioning will not work because this assumes both intervals 
will be entirely in the same partition (they can span multiple partitions). 
When dealing with larger tables we would have to use a special interval-aware 
partitioning, this would create partitions for a number of fully covering 
non-overlapping intervals, and would multicast the rows to each interval it 
belongs to. The subsequent step would be using an index or doing a 
cartesian/BNL join.

Doing a Cartesian Join in a single partition performs horrible. I thought 
it wouldn't be a problem either, but this completely killed the performance of 
an analysis I was doing for a client (account balances at specific dates). 

I do see opportunities for code re-use. But this would be by generalizing 
HashedRelation and the BroadCast join family.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-19 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/7513#issuecomment-122707719
  
@yhuai Don't think jenkins picked up your OK.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-19 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/7513#issuecomment-122674419
  
cc @yhuai 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-19 Thread hvanhovell
GitHub user hvanhovell opened a pull request:

https://github.com/apache/spark/pull/7513

[SPARK-8638] [SQL] Window Function Performance Improvements - Cleanup

This PR contains a few clean-ups that are a part of SPARK-8638: a few style 
issues got fixed, and a few tests were moved.

Git commit message is wrong BTW :(...

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hvanhovell/spark SPARK-8638-cleanup

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/7513.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #7513


commit 4e69d08ab0292220a3cf67d3011947f80f931ed7
Author: Herman van Hovell hvanhov...@questtec.nl
Date:   2015-07-19T15:52:28Z

Fixed Perfomance Regression for Shrinking Window Frames (+Rebase)




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8640] [SQL] Enable Processing of Multip...

2015-07-20 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/7515#issuecomment-122912351
  
cc @yhuai 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8682][SQL][WIP] Range Join

2015-07-13 Thread hvanhovell
GitHub user hvanhovell opened a pull request:

https://github.com/apache/spark/pull/7379

[SPARK-8682][SQL][WIP] Range Join

*...copied from JIRA (SPARK-8682):*

Currently Spark SQL uses a Broadcast Nested Loop join (or a filtered 
Cartesian Join) when it has to execute the following range query:
```
SELECT A.*,
   B.*
FROM   tableA A
   JOIN tableB B
ON A.start = B.end
 AND A.end  B.start
```
This is horribly inefficient. The performance of this query can be greatly 
improved, when one of the tables can be broadcasted, by creating a range index. 
A range index is basically a sorted map containing the rows of the smaller 
table, indexed by both the high and low keys. using this structure the 
complexity of the query would go from O(N * M) to O(N * 2 * LOG(M)), N = number 
of records in the larger table, M = number of records in the smaller (indexed) 
table.

This is currently a work in progress. I will be adding more tests and a 
small benchmark in the next couple of days. If you want to try this out, set 
the ```spark.sql.planner.rangeJoin``` option to ```true``` in the SQL 
configuration.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hvanhovell/spark SPARK-8682

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/7379.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #7379


commit a2ff5dd2c54ca00784fc529bea2da2f05897786b
Author: Herman van Hovell hvanhov...@questtec.nl
Date:   2015-07-01T02:17:03Z

Initial Range Join commit: Compiles  Style Checks work.

commit d2bd7932a2f15a41e39aca1d9fad3441b85fae44
Author: Herman van Hovell hvanhov...@questtec.nl
Date:   2015-07-13T22:27:03Z

Added Tests for Range Index. Ton of Bug Fixes.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9066][SQL] Improve cartesian performanc...

2015-07-21 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/7417#issuecomment-123296087
  
Do you have any benchmarking results for this? Would be great to see how 
much this improves the current situation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9066][SQL] Improve cartesian performanc...

2015-07-21 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7417#discussion_r35098517
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastCartesianProduct.scala
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.joins
+
+import scala.concurrent._
+import scala.concurrent.duration._
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, JoinedRow}
+import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * :: DeveloperApi ::
+ */
+@DeveloperApi
+case class BroadcastCartesianProduct(
--- End diff --

How is this different from a 
[BroadcastNestedLoopJoin](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9066][SQL] Improve cartesian performanc...

2015-07-15 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7417#discussion_r34681979
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
 ---
@@ -34,7 +34,15 @@ case class CartesianProduct(left: SparkPlan, right: 
SparkPlan) extends BinaryNod
 val leftResults = left.execute().map(_.copy())
 val rightResults = right.execute().map(_.copy())
 
-leftResults.cartesian(rightResults).mapPartitions { iter =
+val cartesianRdd = if (leftResults.partitions.size  
rightResults.partitions.size) {
+  rightResults.cartesian(leftResults).mapPartitions { iter =
+iter.map(tuple = (tuple._2, tuple._1))
+  }
+} else {
+  leftResults.cartesian(rightResults)
+}
+
+cartesianRdd.mapPartitions { iter =
   val joinedRow = new JoinedRow
--- End diff --

Quick question. Why not use the ```sizeInBytes```? I assume we want to move 
as little data as possible? Using ```sizeInBytes``` would be a bit more 
involved, since this would involve the planner, and (probably) adding a 
```BuildSide``` parameter to ```CartesianProduct```...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9730][SQL] Add Full Outer Join support ...

2015-08-25 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/8383#discussion_r37883244
  
--- Diff: 
unsafe/src/main/java/org/apache/spark/unsafe/bitset/BitSetMethods.java ---
@@ -68,6 +68,19 @@ public static boolean isSet(Object baseObject, long 
baseOffset, int index) {
   }
 
   /**
+   * Returns {@code true} if all bits are set.
+   */
+  public static boolean allSet(Object baseObject, long baseOffset, long 
bitSetWidthInWords) {
+long addr = baseOffset;
+for (int i = 0; i  bitSetWidthInWords; i++, addr += WORD_SIZE) {
+  if (Platform.getLong(baseObject, addr) == 0) {
--- End diff --

This confuses me. It seems that you are checking that all bits in a given 
word are not set (0), instead of checking that any one the bits is not set. Am 
I missing something?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10100] [SQL] Perfomance improvements to...

2015-08-25 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/8298#issuecomment-134620114
  
@adrian-wang the improvement is absolutely tiny, about 2-3% if you do a lot 
of ```min```'s of ```max```'es.

This PR was a response to misdiagnosed performance regression, the real 
cause was the use of a map in key-less aggregation. The PR adds some value, we 
could add it to 1.6. @yhuai / @adrian-wang if you feel differently about this, 
I'll withdraw the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9241] [SQL] [WIP] Supporting multiple D...

2015-10-26 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/9280#discussion_r43067869
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/distinctFallback.scala
 ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import 
org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, 
CodeGenContext, CodegenFallback}
+import org.apache.spark.sql.types.{AbstractDataType, DataType}
+import org.apache.spark.util.collection.OpenHashSet
+
+/**
+ * Fallback operator for distinct operators. This will be used when a user 
issues multiple
+ * different distinct expressions in a query.
+ *
+ * The operator uses the OpenHashSetUDT for de-duplicating values. It is, 
as a result, not possible
+ * to use UnsafeRow based aggregation.
+ */
+case class DistinctAggregateFallback(function: AggregateFunction2) extends 
DeclarativeAggregate {
+  override def inputTypes: Seq[AbstractDataType] = function.inputTypes
+  override def nullable: Boolean = function.nullable
+  override def dataType: DataType = function.dataType
+  override def children: Seq[Expression] = Seq(function)
+
+  private[this] val input = function.children match {
+case child :: Nil => child
+case children => CreateStruct(children) // TODO can we test this?
+  }
+  private[this] val items = AttributeReference("itemSet", new 
OpenHashSetUDT(input.dataType))()
+
+  override def aggBufferAttributes: Seq[AttributeReference] = Seq(items)
+  override val initialValues: Seq[Expression] = Seq(NewSet(input.dataType))
+  override val updateExpressions: Seq[Expression] = 
Seq(AddItemToSet(input, items))
+  override val mergeExpressions: Seq[Expression] = 
Seq(CombineSets(items.left, items.right))
+  override val evaluateExpression: Expression = function match {
+case f: Count => CountSet(items)
+case f: DeclarativeAggregate => 
ReduceSetUsingDeclarativeAggregate(items, f)
+case f: ImperativeAggregate => 
ReduceSetUsingImperativeAggregate(items, f)
+  }
+}
+
+case class ReduceSetUsingImperativeAggregate(left: Expression, right: 
ImperativeAggregate)
+  extends BinaryExpression with CodegenFallback {
+
+  override def dataType: DataType = right.dataType
+
+  private[this] val single = right.children.size == 1
+
+  // TODO can we assume that the offsets are 0 when we haven't touched 
them yet?
+  private[this] val function = right
+.withNewInputAggBufferOffset(0)
+.withNewMutableAggBufferOffset(0)
+
+  @transient private[this] lazy val buffer =
+new SpecificMutableRow(right.aggBufferAttributes.map(_.dataType))
+
+  @transient private[this] lazy val singleValueInput = new 
GenericMutableRow(1)
+
+  override def eval(input: InternalRow): Any = {
+val result = left.eval(input).asInstanceOf[OpenHashSet[Any]]
+if (result != null) {
+  right.initialize(buffer)
+  val iterator = result.iterator
+  if (single) {
+while (iterator.hasNext) {
+  singleValueInput.update(0, iterator.next())
+  function.update(buffer, singleValueInput)
+}
+  } else {
+while (iterator.hasNext) {
+  function.update(buffer, 
iterator.next().asInstanceOf[InternalRow])
+}
+  }
+  function.eval(buffer)
+} else null
+  }
+}
+
+case class ReduceSetUsingDeclarativeAggregate(left: Expression, right: 
DeclarativeAggregate)
+  extends Expression with CodegenFallback {
+  override def children: Seq[Expression] = Seq(left)
+  override def nullable: Boolean = right.nullable
+  override def dataType: DataType = right.d

[GitHub] spark pull request: [SPARK-9241] [SQL] [WIP] Supporting multiple D...

2015-10-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/9280#discussion_r43116722
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/distinctFallback.scala
 ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import 
org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, 
CodeGenContext, CodegenFallback}
+import org.apache.spark.sql.types.{AbstractDataType, DataType}
+import org.apache.spark.util.collection.OpenHashSet
+
+/**
+ * Fallback operator for distinct operators. This will be used when a user 
issues multiple
+ * different distinct expressions in a query.
+ *
+ * The operator uses the OpenHashSetUDT for de-duplicating values. It is, 
as a result, not possible
+ * to use UnsafeRow based aggregation.
+ */
+case class DistinctAggregateFallback(function: AggregateFunction2) extends 
DeclarativeAggregate {
+  override def inputTypes: Seq[AbstractDataType] = function.inputTypes
+  override def nullable: Boolean = function.nullable
+  override def dataType: DataType = function.dataType
+  override def children: Seq[Expression] = Seq(function)
+
+  private[this] val input = function.children match {
+case child :: Nil => child
+case children => CreateStruct(children) // TODO can we test this?
+  }
+  private[this] val items = AttributeReference("itemSet", new 
OpenHashSetUDT(input.dataType))()
+
+  override def aggBufferAttributes: Seq[AttributeReference] = Seq(items)
+  override val initialValues: Seq[Expression] = Seq(NewSet(input.dataType))
+  override val updateExpressions: Seq[Expression] = 
Seq(AddItemToSet(input, items))
+  override val mergeExpressions: Seq[Expression] = 
Seq(CombineSets(items.left, items.right))
+  override val evaluateExpression: Expression = function match {
+case f: Count => CountSet(items)
+case f: DeclarativeAggregate => 
ReduceSetUsingDeclarativeAggregate(items, f)
+case f: ImperativeAggregate => 
ReduceSetUsingImperativeAggregate(items, f)
+  }
+}
+
+case class ReduceSetUsingImperativeAggregate(left: Expression, right: 
ImperativeAggregate)
+  extends BinaryExpression with CodegenFallback {
+
+  override def dataType: DataType = right.dataType
+
+  private[this] val single = right.children.size == 1
+
+  // TODO can we assume that the offsets are 0 when we haven't touched 
them yet?
+  private[this] val function = right
+.withNewInputAggBufferOffset(0)
+.withNewMutableAggBufferOffset(0)
+
+  @transient private[this] lazy val buffer =
+new SpecificMutableRow(right.aggBufferAttributes.map(_.dataType))
+
+  @transient private[this] lazy val singleValueInput = new 
GenericMutableRow(1)
+
+  override def eval(input: InternalRow): Any = {
+val result = left.eval(input).asInstanceOf[OpenHashSet[Any]]
+if (result != null) {
+  right.initialize(buffer)
+  val iterator = result.iterator
+  if (single) {
+while (iterator.hasNext) {
+  singleValueInput.update(0, iterator.next())
+  function.update(buffer, singleValueInput)
+}
+  } else {
+while (iterator.hasNext) {
+  function.update(buffer, 
iterator.next().asInstanceOf[InternalRow])
+}
+  }
+  function.eval(buffer)
+} else null
+  }
+}
+
+case class ReduceSetUsingDeclarativeAggregate(left: Expression, right: 
DeclarativeAggregate)
+  extends Expression with CodegenFallback {
+  override def children: Seq[Expression] = Seq(left)
+  override def nullable: Boolean = right.nullable
+  override def dataType: DataType = right.d

[GitHub] spark pull request: [SPARK-9298][SQL] Add pearson correlation aggr...

2015-10-29 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/8587#discussion_r43439837
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala
 ---
@@ -524,6 +525,133 @@ case class Sum(child: Expression) extends 
DeclarativeAggregate {
   override val evaluateExpression = Cast(currentSum, resultType)
 }
 
+/**
+ * Compute Pearson correlation between two expressions.
+ * When applied on empty data (i.e., count is zero), it returns NaN.
+ *
+ * Definition of Pearson correlation can be found at
+ * 
http://en.wikipedia.org/wiki/Pearson_product-moment_correlation_coefficient
+ *
+ * @param left one of the expressions to compute correlation with.
+ * @param right another expression to compute correlation with.
+ */
+case class Corr(
+left: Expression,
+right: Expression,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0)
+  extends ImperativeAggregate {
+
+  def children: Seq[Expression] = Seq(left, right)
+
+  def nullable: Boolean = false
+
+  def dataType: DataType = DoubleType
+
+  def inputTypes: Seq[AbstractDataType] = Seq(DoubleType)
+
+  def aggBufferSchema: StructType = 
StructType.fromAttributes(aggBufferAttributes)
+
+  def inputAggBufferAttributes: Seq[AttributeReference] = 
aggBufferAttributes.map(_.newInstance())
+
+  val aggBufferAttributes: Seq[AttributeReference] = Seq(
+AttributeReference("xAvg", DoubleType)(),
+AttributeReference("yAvg", DoubleType)(),
+AttributeReference("Ck", DoubleType)(),
+AttributeReference("MkX", DoubleType)(),
+AttributeReference("MkY", DoubleType)(),
+AttributeReference("count", LongType)())
+
+  override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: 
Int): ImperativeAggregate =
+copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+  override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
ImperativeAggregate =
+copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+  override def initialize(buffer: MutableRow): Unit = {
+(0 until 5).map(idx => buffer.setDouble(mutableAggBufferOffset + idx, 
0.0))
+buffer.setLong(mutableAggBufferOffset + 5, 0L)
+  }
+
+  override def update(buffer: MutableRow, input: InternalRow): Unit = {
+val x = left.eval(input).asInstanceOf[Double]
+val y = right.eval(input).asInstanceOf[Double]
+
+var xAvg = buffer.getDouble(mutableAggBufferOffset)
+var yAvg = buffer.getDouble(mutableAggBufferOffset + 1)
+var Ck = buffer.getDouble(mutableAggBufferOffset + 2)
+var MkX = buffer.getDouble(mutableAggBufferOffset + 3)
+var MkY = buffer.getDouble(mutableAggBufferOffset + 4)
+var count = buffer.getLong(mutableAggBufferOffset + 5)
+
+val deltaX = x - xAvg
+val deltaY = y - yAvg
+count += 1
+xAvg += deltaX / count
+yAvg += deltaY / count
+Ck += deltaX * (y - yAvg)
+MkX += deltaX * (x - xAvg)
+MkY += deltaY * (y - yAvg)
+
+buffer.setDouble(mutableAggBufferOffset, xAvg)
+buffer.setDouble(mutableAggBufferOffset + 1, yAvg)
+buffer.setDouble(mutableAggBufferOffset + 2, Ck)
+buffer.setDouble(mutableAggBufferOffset + 3, MkX)
+buffer.setDouble(mutableAggBufferOffset + 4, MkY)
+buffer.setLong(mutableAggBufferOffset + 5, count)
+  }
+
+  // Merge counters from other partitions. Formula can be found at:
+  // http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
+  override def merge(buffer1: MutableRow, buffer2: InternalRow): Unit = {
+val count2 = buffer2.getLong(inputAggBufferOffset + 5)
+
+if (count2 > 0) {
--- End diff --

Thanks for the comment. Now it is obvious, I wasn't thinking...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9298][SQL] Add pearson correlation aggr...

2015-10-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/8587#discussion_r43162271
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala
 ---
@@ -524,6 +525,133 @@ case class Sum(child: Expression) extends 
DeclarativeAggregate {
   override val evaluateExpression = Cast(currentSum, resultType)
 }
 
+/**
+ * Compute Pearson correlation between two expressions.
+ * When applied on empty data (i.e., count is zero), it returns NaN.
+ *
+ * Definition of Pearson correlation can be found at
+ * 
http://en.wikipedia.org/wiki/Pearson_product-moment_correlation_coefficient
+ *
+ * @param left one of the expressions to compute correlation with.
+ * @param right another expression to compute correlation with.
+ */
+case class Corr(
+left: Expression,
+right: Expression,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0)
+  extends ImperativeAggregate {
+
+  def children: Seq[Expression] = Seq(left, right)
+
+  def nullable: Boolean = false
+
+  def dataType: DataType = DoubleType
+
+  def inputTypes: Seq[AbstractDataType] = Seq(DoubleType)
+
+  def aggBufferSchema: StructType = 
StructType.fromAttributes(aggBufferAttributes)
+
+  def inputAggBufferAttributes: Seq[AttributeReference] = 
aggBufferAttributes.map(_.newInstance())
+
+  val aggBufferAttributes: Seq[AttributeReference] = Seq(
+AttributeReference("xAvg", DoubleType)(),
+AttributeReference("yAvg", DoubleType)(),
+AttributeReference("Ck", DoubleType)(),
+AttributeReference("MkX", DoubleType)(),
+AttributeReference("MkY", DoubleType)(),
+AttributeReference("count", LongType)())
+
+  override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: 
Int): ImperativeAggregate =
+copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+  override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
ImperativeAggregate =
+copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+  override def initialize(buffer: MutableRow): Unit = {
+(0 until 5).map(idx => buffer.setDouble(mutableAggBufferOffset + idx, 
0.0))
+buffer.setLong(mutableAggBufferOffset + 5, 0L)
+  }
+
+  override def update(buffer: MutableRow, input: InternalRow): Unit = {
+val x = left.eval(input).asInstanceOf[Double]
+val y = right.eval(input).asInstanceOf[Double]
+
+var xAvg = buffer.getDouble(mutableAggBufferOffset)
+var yAvg = buffer.getDouble(mutableAggBufferOffset + 1)
+var Ck = buffer.getDouble(mutableAggBufferOffset + 2)
+var MkX = buffer.getDouble(mutableAggBufferOffset + 3)
+var MkY = buffer.getDouble(mutableAggBufferOffset + 4)
+var count = buffer.getLong(mutableAggBufferOffset + 5)
+
+val deltaX = x - xAvg
+val deltaY = y - yAvg
+count += 1
+xAvg += deltaX / count
+yAvg += deltaY / count
+Ck += deltaX * (y - yAvg)
+MkX += deltaX * (x - xAvg)
+MkY += deltaY * (y - yAvg)
+
+buffer.setDouble(mutableAggBufferOffset, xAvg)
+buffer.setDouble(mutableAggBufferOffset + 1, yAvg)
+buffer.setDouble(mutableAggBufferOffset + 2, Ck)
+buffer.setDouble(mutableAggBufferOffset + 3, MkX)
+buffer.setDouble(mutableAggBufferOffset + 4, MkY)
+buffer.setLong(mutableAggBufferOffset + 5, count)
+  }
+
+  // Merge counters from other partitions. Formula can be found at:
+  // http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
+  override def merge(buffer1: MutableRow, buffer2: InternalRow): Unit = {
+val count2 = buffer2.getLong(inputAggBufferOffset + 5)
+
+if (count2 > 0) {
--- End diff --

Is it safe to assume that the ```count2``` in ```buffer1``` is non zero? 
There is - currently - no documentation on this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11388][Build]Fix self closing tags.

2015-10-28 Thread hvanhovell
GitHub user hvanhovell opened a pull request:

https://github.com/apache/spark/pull/9339

[SPARK-11388][Build]Fix self closing tags.

Java 8 javadoc does not like self closing tags: ``, ``, ...

This PR fixes those.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hvanhovell/spark SPARK-11388

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/9339.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #9339


commit 7d8052830cdf6456e4a8e3233c943bccf595dc9d
Author: Herman van Hovell <hvanhov...@questtec.nl>
Date:   2015-10-28T21:00:09Z

Fix self closing tags.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11388][Build]Fix self closing tags.

2015-10-29 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/9339#issuecomment-152135261
  
jenkins retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11388][Build]Fix self closing tags.

2015-10-29 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/9339#issuecomment-152135237
  
Hmmm, test fails with the following beautiful error:

[error] (core/test:test) sbt.TestsFailedException: Tests unsuccessful
[error] Total time: 4809 s, completed Oct 29, 2015 2:32:35 AM

Retesting.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9241] [SQL] [WIP] Supporting multiple D...

2015-10-26 Thread hvanhovell
GitHub user hvanhovell opened a pull request:

https://github.com/apache/spark/pull/9280

[SPARK-9241] [SQL] [WIP] Supporting multiple DISTINCT columns

This PR adds support for multiple distinct columns to the new aggregation 
code path. 

The implementation uses the ```OpenHashSet``` class and set expressions. As 
a result we can only use the slower sort based aggregation code path. This also 
means the code will be probably slower than the old hash aggregation.

The PR is currently in the proof of concept phase, and I have submitted it 
to get some feedback to see if I am headed in the right direction. I'll add 
more tests if this considered to be the way to go.

An example using the new code path:

val df = sqlContext
  .range(1 << 25)
  .select(
$"id".as("employee_id"),
(rand(6321782L) * 4 + 1).cast("int").as("department_id"),
when(rand(981293L) >= 0.5, "M").otherwise("F").as("gender"),
(rand(7123L) * 3 + 1).cast("int").as("education_level")
  )

df.registerTempTable("employee")

// Regular query.
sql("""
select   department_id as d,
 count(distinct gender, education_level) as c0,
 count(distinct gender) as c1,
 count(distinct education_level) as c2
from employee
group by department_id
""").show()

cc @yhuai 



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hvanhovell/spark SPARK-9241

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/9280.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #9280


commit 256e1f6902b8adbc304c6e287d7cfdf2ef97b12b
Author: Herman van Hovell <hvanhov...@questtec.nl>
Date:   2015-10-26T12:46:33Z

Created distinct fallback mechanism.

commit 6a87384de8d934327ead72daf7210e29be8687b6
Author: Herman van Hovell <hvanhov...@questtec.nl>
Date:   2015-10-26T13:35:01Z

Added fallback distinct creation to aggregate conversion.

commit 3bd6db5390dee044ab4673e38329f584b0436a66
Author: Herman van Hovell <hvanhov...@questtec.nl>
Date:   2015-10-26T15:07:22Z

Fix style. Fix CG for OpenHashSetUDT. Fix bug.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11594][SQL][REPL] Cannot create UDAF in...

2015-11-09 Thread hvanhovell
GitHub user hvanhovell opened a pull request:

https://github.com/apache/spark/pull/9568

[SPARK-11594][SQL][REPL] Cannot create UDAF in REPL

This PR enables users to create a UDAF in the REPL without getting a 
```java.lang.InternalError```.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hvanhovell/spark SPARK-11594

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/9568.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #9568


commit 401230c51ad93425c90ec3c58746dbfba154410c
Author: Herman van Hovell <hvanhov...@questtec.nl>
Date:   2015-11-09T13:21:29Z

Enable the creation of a UDAF in the REPL.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9830] [SQL] Remove AggregateExpression1...

2015-11-09 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/9556#discussion_r44269743
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala ---
@@ -141,40 +141,46 @@ class WindowSpec private[sql](
*/
   private[sql] def withAggregate(aggregate: Column): Column = {
 val windowExpr = aggregate.expr match {
-  case Average(child) => WindowExpression(
-UnresolvedWindowFunction("avg", child :: Nil),
-WindowSpecDefinition(partitionSpec, orderSpec, frame))
-  case Sum(child) => WindowExpression(
-UnresolvedWindowFunction("sum", child :: Nil),
-WindowSpecDefinition(partitionSpec, orderSpec, frame))
-  case Count(child) => WindowExpression(
-UnresolvedWindowFunction("count", child :: Nil),
-WindowSpecDefinition(partitionSpec, orderSpec, frame))
-  case First(child, ignoreNulls) => WindowExpression(
-// TODO this is a hack for Hive UDAF first_value
-UnresolvedWindowFunction(
-  "first_value",
-  child :: ignoreNulls :: Nil),
-WindowSpecDefinition(partitionSpec, orderSpec, frame))
-  case Last(child, ignoreNulls) => WindowExpression(
-// TODO this is a hack for Hive UDAF last_value
-UnresolvedWindowFunction(
-  "last_value",
-  child :: ignoreNulls :: Nil),
-WindowSpecDefinition(partitionSpec, orderSpec, frame))
-  case Min(child) => WindowExpression(
-UnresolvedWindowFunction("min", child :: Nil),
-WindowSpecDefinition(partitionSpec, orderSpec, frame))
-  case Max(child) => WindowExpression(
-UnresolvedWindowFunction("max", child :: Nil),
-WindowSpecDefinition(partitionSpec, orderSpec, frame))
-  case wf: WindowFunction => WindowExpression(
-wf,
-WindowSpecDefinition(partitionSpec, orderSpec, frame))
+  case AggregateExpression(aggregateFunction, _, isDistinct) if 
!isDistinct =>
+aggregateFunction match {
--- End diff --

Window functions also need to be wrapped in an ```AggregateExpression``` in 
```functions.scala``` for this to work. It currently fails 
HiveDataFrameWindowSuite due to this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11594][SQL][REPL] Cannot create UDAF in...

2015-11-09 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/9568#discussion_r44273796
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/expressions/udaf.scala ---
@@ -129,6 +129,13 @@ abstract class UserDefinedAggregateFunction extends 
Serializable {
 isDistinct = true)
 Column(aggregateExpression)
   }
+
+  /**
+* The name of the UDAF. This is currently the simple name of class. 
This can create an
+* [[java.lang.InternalError]] if the UDAF class was created in the 
REPL; override this method
+* in these cases.
+*/
+  def name: String = getClass.getSimpleName
--- End diff --

I am not catching a ```java.lang.InternalError``` here, because I think 
these shouldn't be caught at all (the error could have different cause). We 
enable the use to redefine the name to work arround this problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9830] [SQL] Remove AggregateExpression1...

2015-11-09 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/9556#issuecomment-155054384
  
Made a quick pass. I think the PR is in good shape. I do have a few 
aditional questions/remarks:
* Maybe should add some documentation to ```DeclarativeAggregate``` to 
explain why developers have to use lazy vals for all expressions involving the 
child expression(s).
* ```MultipleDistinctRewriter``` is in a 'funny' file/location. Shouldn't 
we move it to the analyzer?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11594][SQL][REPL] Cannot create UDAF in...

2015-11-11 Thread hvanhovell
Github user hvanhovell closed the pull request at:

https://github.com/apache/spark/pull/9568


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11594][SQL][REPL] Cannot create UDAF in...

2015-11-11 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/9568#issuecomment-155710292
  
Move to scala 2.10.5 fixed this. Closing PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11594][SQL][REPL] Cannot create UDAF in...

2015-11-10 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/9568#discussion_r44469882
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala ---
@@ -452,7 +452,7 @@ private[sql] case class ScalaUDAF(
   }
 
   override def toString: String = {
-s"""${udaf.getClass.getSimpleName}(${children.mkString(",")})"""
--- End diff --

The ```getSimpleName``` call on the defined class causes the internal error.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [WIP] [SPARK-11636] [SQL] Support as for class...

2015-11-10 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/9602#issuecomment-155576674
  
Does a REPL defined class blow up with a ```java.lang.InternalError```? If 
it does, then we have the same problem: 
https://github.com/apache/spark/pull/9568


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9241][SQL] Supporting multiple DISTINCT...

2015-11-10 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/9566#issuecomment-155578114
  
I'll rebase.

The potential bug is caused by the fact that the attributes for the 
distinct columns and the expressions for the distinct columns can possibly be 
misaligned. This currently relies on the fact that a sequence maintains the 
same order as a (hash) map derived from it. This currently works, but is not 
guaranteed by the scala collections library. I'd rather fix this, than to wait 
for something to go wrong.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9241][SQL] Supporting multiple DISTINCT...

2015-11-11 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/9566#discussion_r44514711
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
 ---
@@ -545,19 +576,21 @@ abstract class AggregationQuerySuite extends 
QueryTest with SQLTestUtils with Te
   |  count(distinct value2),
   |  sum(distinct value2),
   |  count(distinct value1, value2),
+  |  longProductSum(distinct value1, value2),
--- End diff --

Yes and No.

The input we care about only consists of these tuples: ```[value1=null, 
value2=null], [value1=null, value2=1], [value1=1, value2=null], and [value1=1, 
value2=1]```

However in the current implementation a distinct aggregate will see more 
input than those. It will also see records from other groups. However, the 
values in these records are nulled out. The assumption here is that an 
AggregateFunction is not changed by an all NULL update. The only case I can 
think of that would be problematic is a ```FIRST(DISTINCT ...)```; which 
shouldn't be used like that anyway.

We could solve this by wrapping AggregateFunctions with an operator which 
will only update if the group id is correct.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9241][SQL] Supporting multiple DISTINCT...

2015-11-10 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/9566#discussion_r44472817
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala
 ---
@@ -151,11 +151,12 @@ case class DistinctAggregationRewriter(conf: 
CatalystConf) extends Rule[LogicalP
   }
 
   // Setup unique distinct aggregate children.
-  val distinctAggChildren = distinctAggGroups.keySet.flatten.toSeq
-  val distinctAggChildAttrMap = 
distinctAggChildren.map(expressionAttributePair).toMap
--- End diff --

By calling ```toMap``` I am potentially breaking the alignment between 
```distinctAggChildren``` and ```distinctAggChildAttrs```.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11594][SQL][REPL] Cannot create UDAF in...

2015-11-10 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/9568#issuecomment-155589310
  
Sounds like a good idea. I'll add this in the morning.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11594][SQL][REPL] Cannot create UDAF in...

2015-11-10 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/9568#issuecomment-155582285
  
This is actually a scala problem: 
https://issues.scala-lang.org/browse/SI-9051


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11451][SQL] Support single distinct cou...

2015-11-08 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/9409#issuecomment-154825955
  
retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11451][SQL] Support single distinct cou...

2015-11-08 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/9409#issuecomment-154826382
  
Jenkins does not like me...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11451][SQL] Support single distinct cou...

2015-11-08 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/9409#issuecomment-154831183
  
@yhuai can you get jenkins to test this?

The bug exposed by this patch affected the regular aggregation path, as 
soon as we used more than one regular aggregate, the chance existed that an 
attribute and its source expression got misaligned. This has been fixed. I have 
also added a test for this situation.

If we choose not to add this to the 1.6 branch, then we have to create a 
separte PR containing only the bugfix and get that one in.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11451][SQL] Support single distinct cou...

2015-11-08 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/9409#issuecomment-154825193
  
test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9830] [SQL] Remove AggregateExpression1...

2015-11-08 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/9556#issuecomment-154984531
  
retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11553] [SQL] Primitive Row accessors sh...

2015-11-14 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/9642#issuecomment-156693107
  
```UnsafeRow``` and ```SpecificRow``` have similar problems. Shouldn't we 
fix those as well? For example:

import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow

val srow = new SpecificMutableRow(IntegerType :: Nil)
srow.isNullAt(0)
srow.getInt(0)

val urow = new UnsafeRow()
urow.pointTo(new Array[Byte](16), 1, 4)
urow.isNullAt(0)
urow.getInt(0)

// Result:
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
srow: org.apache.spark.sql.catalyst.expressions.SpecificMutableRow = 
[null]
res129: Boolean = true
res130: Int = 0
urow: org.apache.spark.sql.catalyst.expressions.UnsafeRow = []
res133: Boolean = false
res134: Int = 0

I'd actualy rather not touch this at all. When you are using internal API 
you should be more carefull and expect some quirkiness.

I can currently think of only one place in which this causes some problems: 
UDFs with primitive parameters. The engine will pass in default values instead 
of nulls. Are there any other situations in which this causes problems?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11553] [SQL] Primitive Row accessors sh...

2015-11-14 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/9642#issuecomment-156697230
  
> You mean discard change and only update documentation? Yes this is one 
option - I even think that it is not so bad. 

I would only update the documentation. Internal mutable rows are among the 
most performance critical classes in Spark SQL, so I am not that keen to add 
(potentially unnecessary) branching to every primitive getter. When someone is 
using ```InternalRow```s he or she is using a SparkSQL internal API anyway, and 
really should be knowing what (s)he is doing.

> You mean problem caused by not introducing this change?

Yes, do you know any other problems caused by this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9830] [SQL] Remove AggregateExpression1...

2015-11-09 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/9556#issuecomment-155225000
  
PySpark results are correct, just not in the correct form :(...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9830] [SQL] Remove AggregateExpression1...

2015-11-09 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/9556#issuecomment-155224746
  
LGTM, pending a successful test build.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9830] [SQL] Remove AggregateExpression1...

2015-11-09 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/9556#discussion_r44343490
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -146,148 +146,105 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 }
   }
 
-  object HashAggregation extends Strategy {
-def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-  // Aggregations that can be performed in two phases, before and 
after the shuffle.
-  case PartialAggregation(
-  namedGroupingAttributes,
-  rewrittenAggregateExpressions,
-  groupingExpressions,
-  partialComputation,
-  child) if !canBeConvertedToNewAggregation(plan) =>
-execution.Aggregate(
-  partial = false,
-  namedGroupingAttributes,
-  rewrittenAggregateExpressions,
-  execution.Aggregate(
-partial = true,
-groupingExpressions,
-partialComputation,
-planLater(child))) :: Nil
-
-  case _ => Nil
-}
-
-def canBeConvertedToNewAggregation(plan: LogicalPlan): Boolean = plan 
match {
-  case a: logical.Aggregate =>
-if (sqlContext.conf.useSqlAggregate2 && 
sqlContext.conf.codegenEnabled) {
-  a.newAggregation.isDefined
-} else {
-  Utils.checkInvalidAggregateFunction2(a)
-  false
-}
-  case _ => false
-}
-
-def allAggregates(exprs: Seq[Expression]): Seq[AggregateExpression1] =
-  exprs.flatMap(_.collect { case a: AggregateExpression1 => a })
-  }
-
   /**
* Used to plan the aggregate operator for expressions based on the 
AggregateFunction2 interface.
*/
   object Aggregation extends Strategy {
 def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-  case p: logical.Aggregate if sqlContext.conf.useSqlAggregate2 &&
-  sqlContext.conf.codegenEnabled =>
-val converted = p.newAggregation
-converted match {
-  case None => Nil // Cannot convert to new aggregation code path.
-  case Some(logical.Aggregate(groupingExpressions, 
resultExpressions, child)) =>
-// A single aggregate expression might appear multiple times 
in resultExpressions.
-// In order to avoid evaluating an individual aggregate 
function multiple times, we'll
-// build a set of the distinct aggregate expressions and build 
a function which can
-// be used to re-write expressions so that they reference the 
single copy of the
-// aggregate function which actually gets computed.
-val aggregateExpressions = resultExpressions.flatMap { expr =>
-  expr.collect {
-case agg: AggregateExpression2 => agg
-  }
-}.distinct
-// For those distinct aggregate expressions, we create a map 
from the
-// aggregate function to the corresponding attribute of the 
function.
-val aggregateFunctionToAttribute = aggregateExpressions.map { 
agg =>
-  val aggregateFunction = agg.aggregateFunction
-  val attribute = Alias(aggregateFunction, 
aggregateFunction.toString)().toAttribute
-  (aggregateFunction, agg.isDistinct) -> attribute
-}.toMap
-
-val (functionsWithDistinct, functionsWithoutDistinct) =
-  aggregateExpressions.partition(_.isDistinct)
-if 
(functionsWithDistinct.map(_.aggregateFunction.children).distinct.length > 1) {
-  // This is a sanity check. We should not reach here when we 
have multiple distinct
-  // column sets (aggregate.NewAggregation will not match).
-  sys.error(
-"Multiple distinct column sets are not supported by the 
new aggregation" +
-  "code path.")
-}
+  case logical.Aggregate(groupingExpressions, resultExpressions, 
child) =>
+// A single aggregate expression might appear multiple times in 
resultExpressions.
+// In order to avoid evaluating an individual aggregate function 
multiple times, we'll
+// build a set of the distinct aggregate expressions and build a 
function which can
+// be used to re-write expressions so that they reference the 
single copy of the
+// aggregate function which actually gets computed.
+val aggregateExpressions = resultExpressions.flatMap { expr =>
+  expr.collect {
+

[GitHub] spark pull request: [SPARK-9241][SQL] Supporting multiple DISTINCT...

2015-11-09 Thread hvanhovell
GitHub user hvanhovell opened a pull request:

https://github.com/apache/spark/pull/9566

[SPARK-9241][SQL] Supporting multiple DISTINCT columns - follow-up (3)

This PR is a 2nd follow-up for 
[SPARK-9241](https://issues.apache.org/jira/browse/SPARK-9241). It contains the 
following improvements:
* Fix for a potential bug in distinct child expression and attribute 
alignment.
* Improved handling of duplicate distinct child expressions.
* Added test for distinct UDAF with multiple children.

cc @yhuai 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hvanhovell/spark SPARK-9241-followup-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/9566.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #9566


commit ea4ea69ca4980163c9a31aa4f65b6db6121d62e3
Author: Herman van Hovell <hvanhov...@questtec.nl>
Date:   2015-11-09T11:44:26Z

Fix potential bug in distinct child expression and attribute alignment & 
improve handling of duplicate distinct child expressions. Added test for 
distinct UDAF with multiple children.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   6   7   8   9   10   >