[GitHub] spark pull request: [SPARK-1442][SQL][WIP] Window Function Support...
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/5604#issuecomment-97518465 Hi, I have been experimenting with Window functions in Spark SQL as well. It has been partially based on this. You can find my work [here](https://github.com/hvanhovell/spark-window). I have deviated from the original implementation in couple of ways: - Implemented it as an extension to Spark SQL and not Hive. All aggregates use the Spark SQL implementations (not the Hive UDAFs). - Use of SPARK 1.4 child ordering requirements. Sorting is planned by the engine; this will especially interesting as soon as exchange will start supporting secondary sorting. I have tried a few sorting schemes but this one is currently the fastest. - Only a single window specification (grouping and ordering) is processed at a time. The analyzer should take care of multiple window specifications. - The current implementation is semi-blocking; it processes one group at a time. This means only the rows for one group per partition are kept in memory. In the future we should also accommodate the case in which all aggregates are streaming (perhaps with some buffering). Shall we try to join forces, and come up with one good PR? Kind regards, Herman --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7322] [SQL] [WIP] Support Window Functi...
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/6104#issuecomment-101519756 Hi, In the JIRA the following examples is given: ``` df.select( df.store, df.date, df.sales, avg(df.sales).over.partitionBy(df.store) .orderBy(df.store) .rowsFollowing(0)// this means from unbounded preceding to current row ) ``` Is there a reason for why the aggregation operation has moved from the beginning (the style in the JIRA), to the end (style above)? Are both still possible? I'd prefer the former, since it seems a bit shorter, and more recognizable comming from SQL. On a related note. Is it also an idea to be able to create a seperate window (groupBy/orderBy) definition and use this definition in one or more windowed aggregates. For example: ``` val window = partitionBy($store).orderBy($date) df.select ( $store ,$date ,sum($sales).over(window).rowsFollowing(0).as(TotalSales) ,sum($sales).over(window).rowsFollowing(0).rowsPreceding(2).as(SalesLast3M) ) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7712] [SQL] Move Window Functions from ...
GitHub user hvanhovell opened a pull request: https://github.com/apache/spark/pull/6278 [SPARK-7712] [SQL] Move Window Functions from Hive UDAFS to Spark Native backend [WIP] This PR aims to improve the current window functionality in Spark SQL in the following ways: - Moving away from Hive backed UDAF's to a more Spark SQL Native implementation. The main advantages are that some overhead can be avoided, that it it easier to extend, and that it opens up the opportunity to do more aggressive code generation (AggregateEvaluation style). - Improve the processing for the 'between unbounded and current' amd the 'between current and unbounded' cases. This was very expensive, now it performs in linear time. - Process different frames for the same group by/order by definition in the same window clause. This should save some memory and reduce some of the overhead. This is currently a work in progress. Quite a bit of testing needs to be done. Any feedback will be greatly appreciated. The JIRA ticket can be found here: https://issues.apache.org/jira/browse/SPARK-7712 You can merge this pull request into a Git repository by running: $ git pull https://github.com/hvanhovell/spark SPARK-7712 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/6278.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6278 commit e4b81d5276076ce1f8130b56c557ff9a4e19dce8 Author: Herman van Hovell hvanhov...@questtec.nl Date: 2015-05-20T07:30:38Z Moved Code from separate project into Spark code base. Added HiveQL parsing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7712] [SQL] Move Window Functions from ...
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/6278#issuecomment-110769725 Thanks for starting the test process. I'll take a look at the merge issues today. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
GitHub user hvanhovell opened a pull request: https://github.com/apache/spark/pull/7057 [SPARK-8638] [SQL] Window Function Performance Improvements ## Description Performance improvements for Spark Window functions. This PR will also serve as the basis for moving away from Hive UDAFs to Spark UDAFs. See JIRA tickets SPARK-8638 and SPARK-7712 for more information. The original work including some benchmarking code for the running case can be here: https://github.com/hvanhovell/spark-window ## Improvements * Much better performance (10x) in running cases (e.g. BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) and UNBOUDED FOLLOWING cases. The current implementation in spark uses a sliding window approach in these cases. This means that an aggregate is maintained for every row, so space usage is N (N being the number of rows). This also means that all these aggregates all need to be updated separately, this takes N*(N-1)/2 updates. The running case differs from the Sliding case because we are only adding data to an aggregate function (no reset is required), we only need to maintain one aggregate (like in the UNBOUNDED PRECEDING AND UNBOUNDED case), update the aggregate for each row, and get the aggregate value after each update. This is what the new implementation does. This approach only uses 1 buffer, and only requires N updates; I am currently working on data with window sizes of 500-1000 doing running sums and this saves a lot of time. The CURRENT ROW AND UNBOUNDED FOLLOWING case als o uses this approach and the fact that aggregate operations are communitative, there is one twist though it will process the input buffer in reverse. * Fewer comparisons in the sliding case. The current implementation determines frame boundaries for every input row. The new implementation makes more use of the fact that the window is sorted, maintains the boundaries, and only moves them when the current row order changes. This is a minor improvement. * A single Window node is able to process all types of Frames for the same Partitioning/Ordering. This saves a little time/memory spent buffering and managing partitions. This will be enabled in a follow-up PR. * A lot of the staging code is moved from the execution phase to the initialization phase. Minor performance improvement, and improves readability of the execution code. You can merge this pull request into a Git repository by running: $ git pull https://github.com/hvanhovell/spark SPARK-8638 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/7057.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #7057 commit f161920218c880e4f2804b19db129936d0d34d61 Author: Herman van Hovell hvanhov...@questtec.nl Date: 2015-06-27T15:39:53Z Major overhaul of Window operator. commit 22e51d39c833d0e69a788ad41b8bd790688b9bd9 Author: Herman van Hovell hvanhov...@questtec.nl Date: 2015-06-27T15:54:00Z Fixed aggregation and range betweens with unbounded test - two different window frames were compared. commit ad7820c6ac04f188e8a6239d314b680c8a6b4551 Author: Herman van Hovell hvanhov...@questtec.nl Date: 2015-06-27T16:00:23Z Disabled Tests 42 43 because tiny numerical differences in answers. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7712] [SQL] Move Window Functions from ...
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/6278#issuecomment-111584747 A small status update: - The PR currently passes all tests. - The code has been rebased, but this is a moving target due to the frequent commits to the SQL code in general, and the commits to the FunctionRegistry in specific. There are a few things left: - The use of Expression trees instead of Expression classes for the more advanced window functions, needs to be discussed. - Some of the tests can be moved from Hive to Core. Other things can be handled in follow-up PR's. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r33641164 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -37,443 +59,622 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = -(projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning(No Partition Defined for Window operation! Moving all data to a single ++ partition, this can cause serious performance degradation.) AllTuples :: Nil -} else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil -} - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning --- End diff -- Not really. I made a ton of changes (it is more of a rewrite than a small patch). I can put the documentation back in, the functionality, can be found in lines 70-76. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r33641261 --- Diff: sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionQuerySuite.scala --- @@ -749,7 +749,7 @@ abstract class HiveWindowFunctionQueryBaseSuite extends HiveComparisonTest with |range between current row and unbounded following) as s1 |from part .stripMargin, reset = false) - + */ --- End diff -- I think this is due to the reverse order processing of these frames. The differences are truely tiny: a few ULPs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r33641591 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala --- @@ -189,7 +189,7 @@ class HiveDataFrameWindowSuite extends QueryTest { df.select( $key, last(value).over( - Window.partitionBy($value).orderBy($key).rangeBetween(1, Long.MaxValue)) + Window.partitionBy($value).orderBy($key).rangeBetween(-1, 0)) --- End diff -- Yes, it was wrong. The default behavior when Hive parses the this partial frame definition '..RANGE 1 preceding...' is to expand this into '..RANGE BETWEEN 1 PRECEDING AND CURRENT ROW...', and not into '..RANGE BETWEEN 1 FOLLOWING AND UNBOUNDED PRECEDING...'. So the test was wrong. But this still strikes me as odd, how did this pass previous tests? Did the default behavior change? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r33641018 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -19,17 +19,39 @@ package org.apache.spark.sql.execution import java.util -import org.apache.spark.rdd.RDD +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.types.IntegerType +import org.apache.spark.rdd.RDD import org.apache.spark.util.collection.CompactBuffer +import scala.collection.mutable /** * :: DeveloperApi :: - * For every row, evaluates `windowExpression` containing Window Functions and attaches - * the results with other regular expressions (presented by `projectList`). - * Evert operator handles a single Window Specification, `windowSpec`. + * This class calculates and outputs (windowed) aggregates over the rows in a single sorted group. + * The aggregates are calculated for each row in the group. An aggregate can take a few forms: + * - Global: The aggregate is calculated for the entire group. Every row has the same value. + * - Rows: The aggregate is calculated based on a subset of the window, and is unique for each + * row and depends on the position of the given row within the window. The group must be sorted + * for this to produce sensible output. Examples are moving averages, running sums and row + * numbers. + * - Range: The aggregate is calculated based on a subset of the window, and is unique for each + * value of the order by clause and depends on its ordering. The group must be sorted for this to + * produce sensible output. + * - Shifted: The aggregate is a displaced value relative to the position of the given row. + * Examples are Lead and Lag. --- End diff -- I agree. There are still a few other documentation inconsistencies, and I'll try to fix those as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r33642234 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -37,443 +59,622 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = -(projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning(No Partition Defined for Window operation! Moving all data to a single ++ partition, this can cause serious performance degradation.) AllTuples :: Nil -} else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil -} - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning --- End diff -- It is on line 77 in the new version. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r33642464 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -19,17 +19,39 @@ package org.apache.spark.sql.execution import java.util -import org.apache.spark.rdd.RDD +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.types.IntegerType +import org.apache.spark.rdd.RDD import org.apache.spark.util.collection.CompactBuffer +import scala.collection.mutable /** * :: DeveloperApi :: - * For every row, evaluates `windowExpression` containing Window Functions and attaches - * the results with other regular expressions (presented by `projectList`). - * Evert operator handles a single Window Specification, `windowSpec`. + * This class calculates and outputs (windowed) aggregates over the rows in a single sorted group. + * The aggregates are calculated for each row in the group. An aggregate can take a few forms: + * - Global: The aggregate is calculated for the entire group. Every row has the same value. + * - Rows: The aggregate is calculated based on a subset of the window, and is unique for each + * row and depends on the position of the given row within the window. The group must be sorted + * for this to produce sensible output. Examples are moving averages, running sums and row + * numbers. + * - Range: The aggregate is calculated based on a subset of the window, and is unique for each + * value of the order by clause and depends on its ordering. The group must be sorted for this to + * produce sensible output. + * - Shifted: The aggregate is a displaced value relative to the position of the given row. + * Examples are Lead and Lag. --- End diff -- The PR also optimizes the processing of Moving and Shrinking frames: * For moving frame processing the number of comparisons are reduced. This didn't look like the most rewarding improvement, but I was surprised to find it did improved performance by quite a margin. * Shrinking frames are indeed processed in reverse order. Which makes building it as fast as the growing case (it uses more memory though). I share your concerns, and solving this at the root (the function itself) would indeed be the best. I'll revert this for now, and file a JIRA request for future reference. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9740] [SPARK-9592] [SQL] Change the def...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/8113#discussion_r36873052 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala --- @@ -130,24 +147,67 @@ case class First(child: Expression) extends AlgebraicAggregate { private val first = AttributeReference(first, child.dataType)() - override val bufferAttributes = first :: Nil + private val valueSet = AttributeReference(valueSet, BooleanType)() + + override val bufferAttributes = first :: valueSet :: Nil override val initialValues = Seq( -/* first = */ Literal.create(null, child.dataType) +/* first = */ Literal.create(null, child.dataType), +/* valueSet = */ Literal.create(false, BooleanType) ) - override val updateExpressions = Seq( -/* first = */ If(IsNull(first), child, first) - ) + override val updateExpressions = { +val litTrue = Literal.create(true, BooleanType) +if (ignoreNulls) { + Seq( +/* first = */ If(Or(valueSet, IsNull(child)), first, child), +/* valueSet = */ If(Or(valueSet, IsNull(child)), valueSet, litTrue) --- End diff -- Nit: ```Or(valueSet, Not(IsNull(child)))``` is a bit shorter. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9740] [SPARK-9592] [SQL] Change the def...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/8113#discussion_r36873218 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala --- @@ -130,24 +147,67 @@ case class First(child: Expression) extends AlgebraicAggregate { private val first = AttributeReference(first, child.dataType)() - override val bufferAttributes = first :: Nil + private val valueSet = AttributeReference(valueSet, BooleanType)() + + override val bufferAttributes = first :: valueSet :: Nil override val initialValues = Seq( -/* first = */ Literal.create(null, child.dataType) +/* first = */ Literal.create(null, child.dataType), +/* valueSet = */ Literal.create(false, BooleanType) ) - override val updateExpressions = Seq( -/* first = */ If(IsNull(first), child, first) - ) + override val updateExpressions = { +val litTrue = Literal.create(true, BooleanType) +if (ignoreNulls) { + Seq( +/* first = */ If(Or(valueSet, IsNull(child)), first, child), +/* valueSet = */ If(Or(valueSet, IsNull(child)), valueSet, litTrue) + ) +} else { + Seq( +/* first = */ If(valueSet, first, child), +/* valueSet = */ litTrue + ) +} + } - override val mergeExpressions = Seq( -/* first = */ If(IsNull(first.left), first.right, first.left) - ) + override val mergeExpressions = { +val litTrue = Literal.create(true, BooleanType) +if (ignoreNulls) { + Seq( +/* first = */ If(Or(valueSet.left, IsNull(first.right)), first.left, first.right), +/* valueSet = */ If(Or(valueSet.left, IsNull(first.right)), valueSet.left, litTrue) + ) +} else { + Seq( +/* first = */ If(valueSet.left, first.left, first.right), +/* valueSet = */ litTrue --- End diff -- Is it possible that two non-updated aggregates get merged? If it is, then this should be: ```Or(valueSet.left. valueSet.right)``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9740] [SQL] Change the default behavior...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/8113#discussion_r36818586 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala --- @@ -130,23 +138,36 @@ case class First(child: Expression) extends AlgebraicAggregate { private val first = AttributeReference(first, child.dataType)() - override val bufferAttributes = first :: Nil + private val valueSet = AttributeReference(valueSet, BooleanType)() + + override val bufferAttributes = first :: valueSet :: Nil override val initialValues = Seq( -/* first = */ Literal.create(null, child.dataType) +/* first = */ Literal.create(null, child.dataType), +/* valueSet = */ Literal.create(false, BooleanType) ) override val updateExpressions = Seq( -/* first = */ If(IsNull(first), child, first) +/* first = */ If(valueSet, first, child), +/* valueSet = */ If(valueSet, valueSet, Literal.create(true, BooleanType)) ) override val mergeExpressions = Seq( -/* first = */ If(IsNull(first.left), first.right, first.left) +/* first = */ If(valueSet, first.left, first.right), +/* valueSet = */ If(valueSet, valueSet, Literal.create(true, BooleanType)) --- End diff -- We could for now simplify this to ```Literal.create(true, BooleanType)```. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9740] [SPARK-9592] [SQL] Change the def...
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/8113#issuecomment-130343660 One more small thing. We should probably also add the ```ignoreNulls``` option to the ```first``` and ```last``` dataframe functions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10100] [SQL] Perfomance improvements to...
GitHub user hvanhovell opened a pull request: https://github.com/apache/spark/pull/8298 [SPARK-10100] [SQL] Perfomance improvements to new MIN/MAX aggregate functions. The new MIN/MAX suffer from a performance regression. This PR aims to fix this by simplifying the evaluation of the MIN/MAX functions. See the JIRA [ticket](https://issues.apache.org/jira/browse/SPARK-10100) for more information. You can merge this pull request into a Git repository by running: $ git pull https://github.com/hvanhovell/spark SPARK-10100 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/8298.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #8298 commit 2fed4dcdddbb22d676a78795a3778c6f02a229ab Author: Herman van Hovell hvanhov...@questtec.nl Date: 2015-08-19T02:36:32Z Performance tweaks to the Min/Max functions: removed a branch in their evaluation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9741][SQL] Approximate Count Distinct u...
GitHub user hvanhovell opened a pull request: https://github.com/apache/spark/pull/8362 [SPARK-9741][SQL] Approximate Count Distinct using the new UDAF interface. This PR implements a HyperLogLog based Approximate Count Distinct function using the new UDAF interface. The implementation is inspired by the ClearSpring HyperLogLog implementation and should produce the same results. There is still some documentation and testing left to do. cc @yhuai You can merge this pull request into a Git repository by running: $ git pull https://github.com/hvanhovell/spark SPARK-9741 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/8362.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #8362 commit f52de0236fbfb85053bd6538b12025ba23d34ee1 Author: Herman van Hovell hvanhov...@questtec.nl Date: 2015-08-21T02:32:15Z Created HyperLogLog aggregate. commit 8ec27b93da4f2d39da7c72f7a88c2742cd38318a Author: Herman van Hovell hvanhov...@questtec.nl Date: 2015-08-21T12:57:27Z Added HLL to conversions. More doc. Improvement. commit e178d9e942720c879dc947a5fd3e6593a351f04f Author: Herman van Hovell hvanhov...@questtec.nl Date: 2015-08-21T17:56:39Z Bug fixes. Style. Documentation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9741][SQL] Approximate Count Distinct u...
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/8362#issuecomment-133566608 Thanks. I was aiming for compatibility with the existing approxCountDistinct, but we can also implement HLL++. HLL++ introduces three (orthogonal) refinements: 64-bit hashing, better low cardinality corrections and a sparse encoding scheme. The first two refinements are easy to add. The third will require a bit more effort. Unit testing this is a bit of a challenge. End-to-end (blackbox) testing is no problem, as long as we know what the result should be, or if we do random testing (results should be within 5% of the actual value). Testing parts of the algorithm is a bit of a PITA: * It is hard to reason about the results (the updated registers) HLL produces. * Register access code and HLL code are intertwined. Both the [ClearSpring](https://github.com/addthis/stream-lib/blob/master/src/main/java/com/clearspring/analytics/stream/cardinality/HyperLogLog.java) and [AggregateKnowledge](https://github.com/aggregateknowledge/java-hll/blob/master/src/main/java/net/agkn/hll/HLL.java) implementations resort to blackbox testing. I will create some blackbox tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8640] [SQL] Enable Processing of Multip...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7515#discussion_r35946650 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -818,7 +818,8 @@ class Analyzer( failAnalysis(s$expr has multiple Window Specifications ($distinctWindowSpec). + sPlease file a bug report with this error message, stack trace, and the query.) } else { - distinctWindowSpec.head + val unbounded = SpecifiedWindowFrame(RowFrame, UnboundedPreceding, UnboundedFollowing) --- End diff -- I think extracting the grouping by and order by clauses is better, and it also conveys the message that any Window operator should be capable of processing multiple frames. The impact of the PR will be a bit larger in that case; I'll have a look at it tomorrow. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9482] [SQL] Fix thread-safey issue of u...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7940#discussion_r36343368 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala --- @@ -47,7 +47,7 @@ case class BroadcastNestedLoopJoin( override def outputsUnsafeRows: Boolean = left.outputsUnsafeRows || right.outputsUnsafeRows override def canProcessUnsafeRows: Boolean = true - @transient private[this] lazy val resultProjection: InternalRow = InternalRow = { + @transient private[this] def genResultProjection: InternalRow = InternalRow = { --- End diff -- ```@transient def```? Does that work? Maybe not the most constructive feedback, but it stood out... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7712] [SQL] Move Window Functions from ...
Github user hvanhovell closed the pull request at: https://github.com/apache/spark/pull/6278 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7712] [SQL] Move Window Functions from ...
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/6278#issuecomment-126397148 This PR is redundant now. See SPARK-8638 SPARK-8640 and SPARK-8641 for the proposed implementation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9729] [SPARK-9363] [WIP] [SQL] Use sort...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7904#discussion_r36487283 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala --- @@ -56,117 +52,247 @@ case class SortMergeJoin( @transient protected lazy val leftKeyGenerator = newProjection(leftKeys, left.output) @transient protected lazy val rightKeyGenerator = newProjection(rightKeys, right.output) + protected[this] def isUnsafeMode: Boolean = { +// TODO(josh): there is an existing bug here: this should also check whether unsafe mode +// is enabled. also, the default for self.codegenEnabled looks inconsistent to me. +codegenEnabled UnsafeProjection.canSupport(leftKeys) UnsafeProjection.canSupport(schema) + } + + // TODO(josh): this will need to change once we use an Unsafe row joiner + override def outputsUnsafeRows: Boolean = false + override def canProcessUnsafeRows: Boolean = isUnsafeMode + override def canProcessSafeRows: Boolean = !isUnsafeMode + private def requiredOrders(keys: Seq[Expression]): Seq[SortOrder] = { // This must be ascending in order to agree with the `keyOrdering` defined in `doExecute()`. keys.map(SortOrder(_, Ascending)) } protected override def doExecute(): RDD[InternalRow] = { -val leftResults = left.execute().map(_.copy()) -val rightResults = right.execute().map(_.copy()) - -leftResults.zipPartitions(rightResults) { (leftIter, rightIter) = +left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) = new Iterator[InternalRow] { // An ordering that can be used to compare keys from both sides. private[this] val keyOrdering = newNaturalAscendingOrdering(leftKeys.map(_.dataType)) -// Mutable per row objects. +private[this] var currentLeftRow: InternalRow = _ +private[this] var currentRightMatches: CompactBuffer[InternalRow] = _ +private[this] var currentMatchIdx: Int = -1 +private[this] val smjScanner = new SortMergeJoinScanner( + leftKeyGenerator, + rightKeyGenerator, + keyOrdering, + leftIter, + rightIter +) private[this] val joinRow = new JoinedRow -private[this] var leftElement: InternalRow = _ -private[this] var rightElement: InternalRow = _ -private[this] var leftKey: InternalRow = _ -private[this] var rightKey: InternalRow = _ -private[this] var rightMatches: CompactBuffer[InternalRow] = _ -private[this] var rightPosition: Int = -1 -private[this] var stop: Boolean = false -private[this] var matchKey: InternalRow = _ - -// initialize iterator -initialize() - -override final def hasNext: Boolean = nextMatchingPair() - -override final def next(): InternalRow = { - if (hasNext) { -// we are using the buffered right rows and run down left iterator -val joinedRow = joinRow(leftElement, rightMatches(rightPosition)) -rightPosition += 1 -if (rightPosition = rightMatches.size) { - rightPosition = 0 - fetchLeft() - if (leftElement == null || keyOrdering.compare(leftKey, matchKey) != 0) { -stop = false -rightMatches = null - } -} -joinedRow - } else { -// no more result -throw new NoSuchElementException - } -} -private def fetchLeft() = { - if (leftIter.hasNext) { -leftElement = leftIter.next() -leftKey = leftKeyGenerator(leftElement) +override final def hasNext: Boolean = + (currentMatchIdx != -1 currentMatchIdx currentRightMatches.length) || fetchNext() + +private[this] def fetchNext(): Boolean = { + if (smjScanner.findNextInnerJoinRows()) { +currentRightMatches = smjScanner.getBuildMatches +currentLeftRow = smjScanner.getStreamedRow +currentMatchIdx = 0 +true } else { -leftElement = null +currentRightMatches = null +currentLeftRow = null +currentMatchIdx = -1 +false } } -private def fetchRight() = { - if (rightIter.hasNext) { -rightElement = rightIter.next() -rightKey = rightKeyGenerator(rightElement) - } else { -rightElement
[GitHub] spark pull request: [SPARK-9357][SQL] Remove JoinedRow/Introduce J...
GitHub user hvanhovell opened a pull request: https://github.com/apache/spark/pull/7942 [SPARK-9357][SQL] Remove JoinedRow/Introduce JoinedProjection [WIP] ```JoinedRow```'s are used to join two rows together, and are used a lot of the most performance critical sections of Spark. The problem with ```JoinedRow``` is that it is an extra layer of indirection, and that the current code has branches; both are serious performance bottlenecks. This PR introduces ```JoinedProjection``` and replaces ```JoinedRow``` as the primary method of combining two rows. A ```JoinedProjection``` is a function that takes a left and a right row as its input, and combines these using the given expressions. ```JoinedRow``` cannot be removed because it provides the only way to do interpreted joined projections (Expression ```eval``` only takes one row as its argument), and because the code generation fallback relies on it. The current implementation supports the interpreted and code generated paths, and has been applied to all aggregate operators in Spark SQL. Other operators using ```JoinedRow```, i.e.: *Joins, Generate and PythonUDF, can be converted in follow-up PRs. cc @yhuai @rxin You can merge this pull request into a Git repository by running: $ git pull https://github.com/hvanhovell/spark SPARK-9357 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/7942.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #7942 commit 4afec8c5d22cc3483e8331193aa52f4f6302b31f Author: Herman van Hovell hvanhov...@questtec.nl Date: 2015-08-04T05:56:26Z WIP - Initial Commit. It compiles. Now make it work. commit 0f1be99d3467d021829b7654d9efc986fc120fa7 Author: Herman van Hovell hvanhov...@questtec.nl Date: 2015-08-04T15:51:51Z Clean-up. Replaced non-joined generate path to two paths. Factored out some more expression support. commit e6c5f076fd4bdeed6cd0b75764b6ba127b9fb84c Author: Herman van Hovell hvanhov...@questtec.nl Date: 2015-08-04T16:43:06Z Removed Joined Row From Aggregate Operators. commit 05914722bcd0a7508536470c86c1b61628674563 Author: Herman van Hovell hvanhov...@questtec.nl Date: 2015-08-04T16:47:56Z Style Fixes. commit 81f11325eb512aa4fc986d323e16a36a9db85185 Author: Herman van Hovell hvanhov...@questtec.nl Date: 2015-08-04T17:02:38Z Non-Branching JoinedRow. commit 296a073ede6c195ce6a08d9e1f84176d086dfd0c Author: Herman van Hovell hvanhov...@questtec.nl Date: 2015-08-04T20:30:42Z Fix CodeGenFallback path. Bugfixes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9980][BUILD] Fix SBT publishLocal error...
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/8209#issuecomment-131206616 I have replaced ```p/``` tags by ```p``` in all java files I could find them in. I haven't touched the ```BytesToByteMap```, ```Bin``` and ```PagedTable``` classes because these are written scala and Scaladoc has no problem with ``'s. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9980][BUILD] Fix SBT publishLocal error...
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/8209#issuecomment-131210873 I'd rather leave the scala source alone for now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9980][BUILD] Fix SBT publishLocal error...
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/8209#issuecomment-131198780 I'll change those as well. It is strange that these didn't create problems; I guess it has something to do with the position of the in the line. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9980][BUILD] Fix SBT publishLocal error...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/8209#discussion_r37103311 --- Diff: launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java --- @@ -211,10 +211,10 @@ public SparkLauncher addSparkArg(String arg) { * Adds an argument with a value to the Spark invocation. If the argument name corresponds to * a known argument, the code validates that the argument actually expects a value, and throws * an exception otherwise. - * p/ + * p --- End diff -- So it turns out that javadoc in java 8 doesn't allow self-closing elements (```br/``` and ```p/```) any more: http://stackoverflow.com/questions/26049329/javadoc-in-jdk-8-invalid-self-closing-element-not-allowed http://www.oracle.com/technetwork/java/javase/documentation/index-137868.html#format ```p``` is the preferred seperator for a paragraph. So its is not HTML but Javadoc we are talking about. Sorry about the confusion. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9980][BUILD] Fix SBT publishLocal error...
GitHub user hvanhovell opened a pull request: https://github.com/apache/spark/pull/8209 [SPARK-9980][BUILD] Fix SBT publishLocal error due to invalid characters in doc Tiny modification to a few comments ```sbt publishLocal``` work again. You can merge this pull request into a Git repository by running: $ git pull https://github.com/hvanhovell/spark SPARK-9980 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/8209.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #8209 commit 500fe15c3ef7c6e835d09dc9d9025c58c3467a76 Author: Herman van Hovell hvanhov...@questtec.nl Date: 2015-08-14T17:18:42Z Replace by HTML friendly lt;lt; commit 4a743c2cb7e6a4aa86e693f6b48e5528334ce5d6 Author: Herman van Hovell hvanhov...@questtec.nl Date: 2015-08-14T17:33:09Z Replace self closing p/ statements witt p. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10001] [CORE] Allow Ctrl-C in spark-she...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/8216#discussion_r37136449 --- Diff: core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala --- @@ -17,6 +17,10 @@ package org.apache.spark.scheduler +import sun.misc.{Signal, SignalHandler} --- End diff -- This dependency can cause both Runtime and Compile time problems, if the used JVM/compiler does not provide these classes. Does this work with other JVM implementations, such as OpenJDK and IBM JDK? I am in favor of this functionality, but I think we should be very careful using platform specific code. Luckily this is not the first time people had to solve this problem. JLine solves this in the following way: https://github.com/jline/jline2/blob/master/src/main/java/jline/console/ConsoleReader.java#L257-L284 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10001] [CORE] Allow Ctrl-C in spark-she...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/8216#discussion_r37137085 --- Diff: core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala --- @@ -17,6 +17,10 @@ package org.apache.spark.scheduler +import sun.misc.{Signal, SignalHandler} --- End diff -- I don't really think there is an easy way around the ```sun.misc.Signal/SignalHander``` approach. I have to admit that my concern is moot if all major implementations support this (can anyone confirm this?). I just like the JLine approach better because this doesn't cause Compile/Run-time errors when classes cannot be found, it will just default to the current behavior. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9740] [SQL] Change the default behavior...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/8113#discussion_r36818762 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala --- @@ -130,23 +138,36 @@ case class First(child: Expression) extends AlgebraicAggregate { private val first = AttributeReference(first, child.dataType)() - override val bufferAttributes = first :: Nil + private val valueSet = AttributeReference(valueSet, BooleanType)() + + override val bufferAttributes = first :: valueSet :: Nil override val initialValues = Seq( -/* first = */ Literal.create(null, child.dataType) +/* first = */ Literal.create(null, child.dataType), +/* valueSet = */ Literal.create(false, BooleanType) ) override val updateExpressions = Seq( -/* first = */ If(IsNull(first), child, first) +/* first = */ If(valueSet, first, child), +/* valueSet = */ If(valueSet, valueSet, Literal.create(true, BooleanType)) ) override val mergeExpressions = Seq( -/* first = */ If(IsNull(first.left), first.right, first.left) +/* first = */ If(valueSet, first.left, first.right), +/* valueSet = */ If(valueSet, valueSet, Literal.create(true, BooleanType)) --- End diff -- ```Or(valueSet.left, valueSet.right)```? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9740] [SQL] Change the default behavior...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/8113#discussion_r36818719 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala --- @@ -130,23 +138,36 @@ case class First(child: Expression) extends AlgebraicAggregate { private val first = AttributeReference(first, child.dataType)() - override val bufferAttributes = first :: Nil + private val valueSet = AttributeReference(valueSet, BooleanType)() + + override val bufferAttributes = first :: valueSet :: Nil override val initialValues = Seq( -/* first = */ Literal.create(null, child.dataType) +/* first = */ Literal.create(null, child.dataType), +/* valueSet = */ Literal.create(false, BooleanType) ) override val updateExpressions = Seq( -/* first = */ If(IsNull(first), child, first) +/* first = */ If(valueSet, first, child), +/* valueSet = */ If(valueSet, valueSet, Literal.create(true, BooleanType)) --- End diff -- We could for now simplify this to ```Literal.create(true, BooleanType)```. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9740] [SQL] Change the default behavior...
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/8113#issuecomment-130128926 Besides the ```valueSet``` update/merge expressions LGTM. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9740] [SQL] Change the default behavior...
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/8113#issuecomment-130128263 LGTM. One final question, shouldn't we introduce a ```skipNulls``` parameter? Or do you want to address this in a follow-up PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9740] [SPARK-9592] [SQL] Change the def...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/8113#discussion_r36878009 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala --- @@ -130,24 +147,67 @@ case class First(child: Expression) extends AlgebraicAggregate { private val first = AttributeReference(first, child.dataType)() - override val bufferAttributes = first :: Nil + private val valueSet = AttributeReference(valueSet, BooleanType)() + + override val bufferAttributes = first :: valueSet :: Nil override val initialValues = Seq( -/* first = */ Literal.create(null, child.dataType) +/* first = */ Literal.create(null, child.dataType), +/* valueSet = */ Literal.create(false, BooleanType) ) - override val updateExpressions = Seq( -/* first = */ If(IsNull(first), child, first) - ) + override val updateExpressions = { +val litTrue = Literal.create(true, BooleanType) +if (ignoreNulls) { + Seq( +/* first = */ If(Or(valueSet, IsNull(child)), first, child), +/* valueSet = */ If(Or(valueSet, IsNull(child)), valueSet, litTrue) + ) +} else { + Seq( +/* first = */ If(valueSet, first, child), +/* valueSet = */ litTrue + ) +} + } - override val mergeExpressions = Seq( -/* first = */ If(IsNull(first.left), first.right, first.left) - ) + override val mergeExpressions = { +val litTrue = Literal.create(true, BooleanType) +if (ignoreNulls) { + Seq( +/* first = */ If(Or(valueSet.left, IsNull(first.right)), first.left, first.right), --- End diff -- I don't think we need to check ```IsNull(first.right)```. On a more general note: I think we don't need to separate the ```ignoreNulls = false``` and ```ignoreNulls = true``` paths. This should be fine for both: ``` /* first = */ If(valueSet.left, first.left, first.right), /* valueSet = */ Or(valueSet.left, valueSet.right) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9740] [SPARK-9592] [SQL] Change the def...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/8113#discussion_r36877100 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala --- @@ -130,24 +147,67 @@ case class First(child: Expression) extends AlgebraicAggregate { private val first = AttributeReference(first, child.dataType)() - override val bufferAttributes = first :: Nil + private val valueSet = AttributeReference(valueSet, BooleanType)() + + override val bufferAttributes = first :: valueSet :: Nil override val initialValues = Seq( -/* first = */ Literal.create(null, child.dataType) +/* first = */ Literal.create(null, child.dataType), +/* valueSet = */ Literal.create(false, BooleanType) ) - override val updateExpressions = Seq( -/* first = */ If(IsNull(first), child, first) - ) + override val updateExpressions = { +val litTrue = Literal.create(true, BooleanType) +if (ignoreNulls) { + Seq( +/* first = */ If(Or(valueSet, IsNull(child)), first, child), +/* valueSet = */ If(Or(valueSet, IsNull(child)), valueSet, litTrue) + ) +} else { + Seq( +/* first = */ If(valueSet, first, child), +/* valueSet = */ litTrue + ) +} + } - override val mergeExpressions = Seq( -/* first = */ If(IsNull(first.left), first.right, first.left) - ) + override val mergeExpressions = { +val litTrue = Literal.create(true, BooleanType) +if (ignoreNulls) { + Seq( +/* first = */ If(Or(valueSet.left, IsNull(first.right)), first.left, first.right), +/* valueSet = */ If(Or(valueSet.left, IsNull(first.right)), valueSet.left, litTrue) --- End diff -- Nit: ```Or(valueSet.left. valueSet.right)``` is shorter. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8641][SPARK-8712][SQL] Native Spark Win...
GitHub user hvanhovell opened a pull request: https://github.com/apache/spark/pull/7715 [SPARK-8641][SPARK-8712][SQL] Native Spark Window Functions [WIP] This replaces the Hive UDAFs with native Spark SQL UDAFs using the new UDAF interface. See the JIRA ticket for more information. This is currently a work in progress. I have created the PR in order to get feedback. There are still a number of open issues: * Remove the Hive Window Function and old Window Function code path. This is completely replaced by the new interface. * Improve the check analysis messages. * A lot of testing: the Distinct aggregation path, Error messages. You can merge this pull request into a Git repository by running: $ git pull https://github.com/hvanhovell/spark SPARK-8641 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/7715.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #7715 commit 73ad5cb4a70b07656431aeec0b8ec8cb938aba81 Author: Herman van Hovell hvanhov...@questtec.nl Date: 2015-07-22T22:06:30Z Added WindowFunctions. commit 160987d3808ce6d44b46660bb641962d5732754d Author: Herman van Hovell hvanhov...@questtec.nl Date: 2015-07-26T04:33:47Z Move to Native Spark UDAFs for window processing. commit 55aed3cf827326388c90262c0fb81f5b5bd83f19 Author: Herman van Hovell hvanhov...@questtec.nl Date: 2015-07-26T04:45:33Z Rebase make it compile again. commit 52c6319b5acdf838b997154b744595676e317668 Author: Herman van Hovell hvanhov...@questtec.nl Date: 2015-07-26T16:56:36Z Bug fixes commit abba35fabb72fd0a93b8c86fc8a723b02cb6c9fb Author: Herman van Hovell hvanhov...@questtec.nl Date: 2015-07-27T19:30:47Z Add NoOp to interpreted projections. commit fedb164b97115a2481e9e74c94c1cf371a7246fe Author: Herman van Hovell hvanhov...@questtec.nl Date: 2015-07-28T03:52:33Z Bugfixes... All tests work now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8641][SPARK-8712][SQL] Native Spark Win...
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/7715#issuecomment-125431976 cc @yhuai --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9066][SQL] Improve cartesian performanc...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7417#discussion_r35433399 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -213,10 +213,51 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object CartesianProduct extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.Join(left, right, _, None) = -execution.joins.CartesianProduct(planLater(left), planLater(right)) :: Nil +// For BroadcastCartesianProduct we will broadcast the small size plan, +// for CartesianProduct we will use the small size plan as cartesian left rdd. +if (right.statistics.sizeInBytes = left.statistics.sizeInBytes) { + if (sqlContext.conf.autoBroadcastJoinThreshold 0 +right.statistics.sizeInBytes = sqlContext.conf.autoBroadcastJoinThreshold) { --- End diff -- You can use ```CanBroadcast``` extractor in order to determine that a side is Broadcastable, this will also consider the ```broadcast``` hint, and is probably more future proof. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9066][SQL] Improve cartesian performanc...
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/7417#issuecomment-124562458 @Sephiroth-Lin The performance improvement sounds really good. It seems like a good thing to put in Spark. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9066][SQL] Improve cartesian performanc...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7417#discussion_r35433038 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -213,10 +213,51 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object CartesianProduct extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.Join(left, right, _, None) = -execution.joins.CartesianProduct(planLater(left), planLater(right)) :: Nil +// For BroadcastCartesianProduct we will broadcast the small size plan, +// for CartesianProduct we will use the small size plan as cartesian left rdd. +if (right.statistics.sizeInBytes = left.statistics.sizeInBytes) { + if (sqlContext.conf.autoBroadcastJoinThreshold 0 +right.statistics.sizeInBytes = sqlContext.conf.autoBroadcastJoinThreshold) { +execution.joins.BroadcastCartesianProduct(planLater(left), planLater(right), + joins.BuildRight) :: Nil + } else { +execution.joins.CartesianProduct(planLater(left), planLater(right), + joins.BuildLeft) :: Nil + } +} else { + if (sqlContext.conf.autoBroadcastJoinThreshold 0 +left.statistics.sizeInBytes = sqlContext.conf.autoBroadcastJoinThreshold) { +execution.joins.BroadcastCartesianProduct(planLater(left), planLater(right), + joins.BuildLeft) :: Nil + } else { +execution.joins.CartesianProduct(planLater(left), planLater(right), + joins.BuildRight) :: Nil + } +} case logical.Join(left, right, Inner, Some(condition)) = -execution.Filter(condition, - execution.joins.CartesianProduct(planLater(left), planLater(right))) :: Nil +if (right.statistics.sizeInBytes = left.statistics.sizeInBytes) { --- End diff -- This code is almost the same as the code above. I would put it in a method i.e. ```createCartesianProduct```, and wrap the result in a ```Filter``` operator. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34844383 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -38,443 +84,661 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = -(projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning(No Partition Defined for Window operation! Moving all data to a single ++ partition, this can cause serious performance degradation.) AllTuples :: Nil -} else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil -} - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning - - override def requiredChildOrdering: Seq[Seq[SortOrder]] = { -// The required child ordering has two parts. -// The first part is the expressions in the partition specification. -// We add these expressions to the required ordering to make sure input rows are grouped -// based on the partition specification. So, we only need to process a single partition -// at a time. -// The second part is the expressions specified in the ORDER BY cluase. -// Basically, we first use sort to group rows based on partition specifications and then sort -// Rows in a group based on the order specification. -(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) :: Nil +} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil } - // Since window functions basically add columns to input rows, this operator - // will not change the ordering of input rows. + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - case class ComputedWindow( -unbound: WindowExpression, -windowFunction: WindowFunction, -resultAttribute: AttributeReference) - - // A list of window functions that need to be computed for each group. - private[this] val computedWindowExpressions = windowExpression.flatMap { window = -window.collect { - case w: WindowExpression = -ComputedWindow( - w, - BindReferences.bindReference(w.windowFunction, child.output), - AttributeReference(swindowResult:$w, w.dataType, w.nullable)()) + /** + * Create a bound ordering object for a given frame type and offset. A bound ordering object is + * used to determine which input row lies within the frame boundaries of an output row. + * + * This method uses Code Generation. It can only be used on the executor side. + * + * @param frameType to evaluate. This can either be Row or Range based. + * @param offset with respect to the row. + * @return a bound ordering object. + */ + private[this] def createBoundOrdering(frameType: FrameType, offset: Int): BoundOrdering = { +frameType match { + case RangeFrame = +val (exprs, current, bound) = if (offset == 0) { + // Use the entire order expression when the offset is 0. + val exprs = windowSpec.orderSpec.map(_.child) + val projection = newMutableProjection(exprs, child.output) + (windowSpec.orderSpec, projection(), projection()) +} +else if (windowSpec.orderSpec.size == 1) { + // Use only the first order expression when the offset is non-null. + val sortExpr = windowSpec.orderSpec.head + val expr = sortExpr.child + // Create the projection which returns the current 'value'. + val current = newMutableProjection(expr :: Nil, child.output)() + // Create the projection which returns the current 'value' modified by adding the offset. + val boundExpr = Add(expr, Cast(Literal.create(offset, IntegerType), expr.dataType)) + val bound = newMutableProjection(boundExpr :: Nil, child.output)() + (sortExpr :: Nil, current, bound) +} +else { + sys.error(Non
[GitHub] spark pull request: [SPARK-8682][SQL][WIP] Range Join
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7379#discussion_r34891100 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastRangeJoin.scala --- @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.joins + +import org.apache.spark.sql.catalyst.util.TypeUtils +import org.apache.spark.util.ThreadUtils + +import scala.annotation.tailrec +import scala.collection.mutable +import scala.concurrent._ +import scala.concurrent.duration._ + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} + +/** + * Performs an inner range join on two tables. A range join typically has the following form: + * + * SELECT A.* + *,B.* + * FROM tableA A + *JOIN tableB B + * ON A.start = B.end + * AND A.end B.start + * + * The implementation builds a range index from the smaller build side, broadcasts this index + * to all executors. The streaming side is then matched against the index. This reduces the number + * of comparisons made by log(n) (n is the number of records in the build table) over the + * typical solution (Nested Loop Join). + * + * TODO NaN values + * TODO NULL values + * TODO Outer joins? StreamSide is quite easy/BuildSide requires bookkeeping and + * TODO This join will maintain sort order. The build side rows will also be added in a lower + * bound sorted fashion. + */ +@DeveloperApi +case class BroadcastRangeJoin( +leftKeys: Seq[Expression], +rightKeys: Seq[Expression], +equality: Seq[Boolean], +buildSide: BuildSide, +left: SparkPlan, +right: SparkPlan) + extends BinaryNode { + + private[this] lazy val (buildPlan, streamedPlan) = buildSide match { +case BuildLeft = (left, right) +case BuildRight = (right, left) + } + + private[this] lazy val (buildKeys, streamedKeys) = buildSide match { +case BuildLeft = (leftKeys, rightKeys) +case BuildRight = (rightKeys, leftKeys) + } + + override def output: Seq[Attribute] = left.output ++ right.output + + @transient + private[this] lazy val buildSideKeyGenerator: Projection = +newProjection(buildKeys, buildPlan.output) + + @transient + private[this] lazy val streamSideKeyGenerator: () = MutableProjection = +newMutableProjection(streamedKeys, streamedPlan.output) + + private[this] val timeout: Duration = { +val timeoutValue = sqlContext.conf.broadcastTimeout +if (timeoutValue 0) { + Duration.Inf +} else { + timeoutValue.seconds +} + } + + // Construct the range index. + @transient + private[this] val indexBroadcastFuture = future { +// Deal with equality. +val Seq(allowLowEqual: Boolean, allowHighEqual: Boolean) = buildSide match { + case BuildLeft = equality.reverse + case BuildRight = equality +} + +// Get the ordering for the datatype. +val ordering = TypeUtils.getOrdering(buildKeys.head.dataType) + +// Note that we use .execute().collect() because we don't want to convert data to Scala types +// TODO find out if the result of a sort and a collect is still sorted. +val eventifier = RangeIndex.toRangeEvent(buildSideKeyGenerator, ordering) +val events = buildPlan.execute().map(_.copy()).collect().flatMap(eventifier) + +// Create the index. +val index = RangeIndex.build(ordering, events, allowLowEqual, allowHighEqual) + +// Broadcast the index. +sparkContext.broadcast(index
[GitHub] spark pull request: [SPARK-9066][SQL] Improve cartesian performanc...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7417#discussion_r35422390 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastCartesianProduct.scala --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.joins + +import scala.concurrent._ +import scala.concurrent.duration._ + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, JoinedRow} +import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} +import org.apache.spark.util.ThreadUtils + +/** + * :: DeveloperApi :: + */ +@DeveloperApi +case class BroadcastCartesianProduct( --- End diff -- The inner join variant with (degenerate) condition ```1 = 1``` would do the same. All I am saying is that this also a way to get a broadcasting cartesian join going, and it saves some lines of code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8682][SQL][WIP] Range Join
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/7379#issuecomment-121250819 Current test errors are a bit weird. They shouldn't have been caused by this change, because the functionality is disabled by default. Rebased to most recent master. See if this helps. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34602077 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -37,443 +59,622 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = -(projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning(No Partition Defined for Window operation! Moving all data to a single ++ partition, this can cause serious performance degradation.) AllTuples :: Nil -} else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil -} - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning --- End diff -- Ahhh... Now I understand. I got confused with ```outputOrdering```, sorry about that. I'll add it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34603151 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -37,443 +59,622 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = -(projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning(No Partition Defined for Window operation! Moving all data to a single ++ partition, this can cause serious performance degradation.) AllTuples :: Nil -} else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil -} - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning --- End diff -- Tiny follow-up. Now I know why I have removed it: ```org.apache.spark.sql.execution.UnaryNode```, the parent class of ```Window```, already implements this in exactly the same way: ``` private[sql] trait UnaryNode extends SparkPlan with trees.UnaryNode[SparkPlan] { self: Product = override def outputPartitioning: Partitioning = child.outputPartitioning } ``` So do I still need to add this to the ```Window``` class? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34616177 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -38,443 +68,645 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = -(projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning(No Partition Defined for Window operation! Moving all data to a single ++ partition, this can cause serious performance degradation.) AllTuples :: Nil -} else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil -} - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning - - override def requiredChildOrdering: Seq[Seq[SortOrder]] = { -// The required child ordering has two parts. -// The first part is the expressions in the partition specification. -// We add these expressions to the required ordering to make sure input rows are grouped -// based on the partition specification. So, we only need to process a single partition -// at a time. -// The second part is the expressions specified in the ORDER BY cluase. -// Basically, we first use sort to group rows based on partition specifications and then sort -// Rows in a group based on the order specification. -(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) :: Nil +} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil } - // Since window functions basically add columns to input rows, this operator - // will not change the ordering of input rows. - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) - case class ComputedWindow( -unbound: WindowExpression, -windowFunction: WindowFunction, -resultAttribute: AttributeReference) - - // A list of window functions that need to be computed for each group. - private[this] val computedWindowExpressions = windowExpression.flatMap { window = -window.collect { - case w: WindowExpression = -ComputedWindow( - w, - BindReferences.bindReference(w.windowFunction, child.output), - AttributeReference(swindowResult:$w, w.dataType, w.nullable)()) -} - }.toArray - - private[this] val windowFrame = -windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame] + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - // Create window functions. - private[this] def windowFunctions(): Array[WindowFunction] = { -val functions = new Array[WindowFunction](computedWindowExpressions.length) -var i = 0 -while (i computedWindowExpressions.length) { - functions(i) = computedWindowExpressions(i).windowFunction.newInstance() - functions(i).init() - i += 1 + /** + * Create a bound ordering object for a given frame type and offset. A bound ordering object is + * used to determine which input row lies within the frame boundaries of an output row. There + * are two types of boundaries that can be evaluated: + * - Row Based: A row based boundary is based on the position of the row within the partition. + * The offset indicates the number of rows above or below the current row, the frame for the + * current row starts or ends. For instance, given a row based sliding frame with a lower bound + * offset of -1 and a upper bound offset of +2. The frame for row with index 5 would range from + * index 4 to index 6. + * - Range based: A range based boundary is based on the actual value of the ORDER BY + * expression(s). The offset is used to alter the value of the ORDER BY expression, for + * instance if the current order by expression has a value of 10 and the lower bound offset + * is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a + * number of constraints on the ORDER
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34617761 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -38,443 +68,645 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = -(projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning(No Partition Defined for Window operation! Moving all data to a single ++ partition, this can cause serious performance degradation.) AllTuples :: Nil -} else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil -} - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning - - override def requiredChildOrdering: Seq[Seq[SortOrder]] = { -// The required child ordering has two parts. -// The first part is the expressions in the partition specification. -// We add these expressions to the required ordering to make sure input rows are grouped -// based on the partition specification. So, we only need to process a single partition -// at a time. -// The second part is the expressions specified in the ORDER BY cluase. -// Basically, we first use sort to group rows based on partition specifications and then sort -// Rows in a group based on the order specification. -(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) :: Nil +} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil } - // Since window functions basically add columns to input rows, this operator - // will not change the ordering of input rows. - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) - case class ComputedWindow( -unbound: WindowExpression, -windowFunction: WindowFunction, -resultAttribute: AttributeReference) - - // A list of window functions that need to be computed for each group. - private[this] val computedWindowExpressions = windowExpression.flatMap { window = -window.collect { - case w: WindowExpression = -ComputedWindow( - w, - BindReferences.bindReference(w.windowFunction, child.output), - AttributeReference(swindowResult:$w, w.dataType, w.nullable)()) -} - }.toArray - - private[this] val windowFrame = -windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame] + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - // Create window functions. - private[this] def windowFunctions(): Array[WindowFunction] = { -val functions = new Array[WindowFunction](computedWindowExpressions.length) -var i = 0 -while (i computedWindowExpressions.length) { - functions(i) = computedWindowExpressions(i).windowFunction.newInstance() - functions(i).init() - i += 1 + /** + * Create a bound ordering object for a given frame type and offset. A bound ordering object is + * used to determine which input row lies within the frame boundaries of an output row. There + * are two types of boundaries that can be evaluated: + * - Row Based: A row based boundary is based on the position of the row within the partition. + * The offset indicates the number of rows above or below the current row, the frame for the + * current row starts or ends. For instance, given a row based sliding frame with a lower bound + * offset of -1 and a upper bound offset of +2. The frame for row with index 5 would range from + * index 4 to index 6. + * - Range based: A range based boundary is based on the actual value of the ORDER BY + * expression(s). The offset is used to alter the value of the ORDER BY expression, for + * instance if the current order by expression has a value of 10 and the lower bound offset + * is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a + * number of constraints on the ORDER
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34618909 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -38,443 +68,645 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = -(projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning(No Partition Defined for Window operation! Moving all data to a single ++ partition, this can cause serious performance degradation.) AllTuples :: Nil -} else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil -} - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning - - override def requiredChildOrdering: Seq[Seq[SortOrder]] = { -// The required child ordering has two parts. -// The first part is the expressions in the partition specification. -// We add these expressions to the required ordering to make sure input rows are grouped -// based on the partition specification. So, we only need to process a single partition -// at a time. -// The second part is the expressions specified in the ORDER BY cluase. -// Basically, we first use sort to group rows based on partition specifications and then sort -// Rows in a group based on the order specification. -(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) :: Nil +} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil } - // Since window functions basically add columns to input rows, this operator - // will not change the ordering of input rows. - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) - case class ComputedWindow( -unbound: WindowExpression, -windowFunction: WindowFunction, -resultAttribute: AttributeReference) - - // A list of window functions that need to be computed for each group. - private[this] val computedWindowExpressions = windowExpression.flatMap { window = -window.collect { - case w: WindowExpression = -ComputedWindow( - w, - BindReferences.bindReference(w.windowFunction, child.output), - AttributeReference(swindowResult:$w, w.dataType, w.nullable)()) -} - }.toArray - - private[this] val windowFrame = -windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame] + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - // Create window functions. - private[this] def windowFunctions(): Array[WindowFunction] = { -val functions = new Array[WindowFunction](computedWindowExpressions.length) -var i = 0 -while (i computedWindowExpressions.length) { - functions(i) = computedWindowExpressions(i).windowFunction.newInstance() - functions(i).init() - i += 1 + /** + * Create a bound ordering object for a given frame type and offset. A bound ordering object is + * used to determine which input row lies within the frame boundaries of an output row. There + * are two types of boundaries that can be evaluated: + * - Row Based: A row based boundary is based on the position of the row within the partition. + * The offset indicates the number of rows above or below the current row, the frame for the + * current row starts or ends. For instance, given a row based sliding frame with a lower bound + * offset of -1 and a upper bound offset of +2. The frame for row with index 5 would range from + * index 4 to index 6. + * - Range based: A range based boundary is based on the actual value of the ORDER BY + * expression(s). The offset is used to alter the value of the ORDER BY expression, for + * instance if the current order by expression has a value of 10 and the lower bound offset + * is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a + * number of constraints on the ORDER
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34857571 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowSuite.scala --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.spark.sql.{Row, QueryTest} +import org.apache.spark.sql.expressions.Window +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.hive.test.TestHive.implicits._ + +/** + * Window expressions are tested extensively by the following test suites: + * [[org.apache.spark.sql.hive.HiveDataFrameWindowSuite]] + * [[org.apache.spark.sql.hive.execution.HiveWindowFunctionQueryWithoutCodeGenSuite]] + * [[org.apache.spark.sql.hive.execution.HiveWindowFunctionQueryFileWithoutCodeGenSuite]] + * However these suites do not cover all possible (i.e. more exotic) settings. This suite fill + * this gap. + * + * TODO Move this class to the sql/core project when we move to Native Spark UDAFs. + */ +class WindowSuite extends QueryTest { + + test(reverse sliding range frame) { +val df = Seq( + (1, Thin, Cell Phone, 6000), + (2, Normal, Tablet, 1500), + (3, Mini, Tablet, 5500), + (4, Ultra thin, Cell Phone, 5500), + (5, Very thin, Cell Phone, 6000), + (6, Big, Tablet, 2500), + (7, Bendable, Cell Phone, 3000), + (8, Foldable, Cell Phone, 3000), + (9, Pro, Tablet, 4500), + (10, Pro2, Tablet, 6500)). + toDF(id, product, category, revenue) +checkAnswer( + df.select( +$id, +avg($revenue).over(Window. + partitionBy($category). + orderBy($revenue.desc). + rangeBetween(-2000L, 1000L)). + cast(int)), +Row(1, 5833) :: Row(2, 2000) :: Row(3, 5500) :: + Row(4, 5833) :: Row(5, 5833) :: Row(6, 2000) :: + Row(7, 3000) :: Row(8, 3000) :: Row(9, 4166) :: + Row(10, 5500) :: Nil) --- End diff -- Yeah that was a mistake on my behalf. I didn't realise that offsets should be flipped on when the order is descending (I swapped the high and low offsets). I pushed a fix. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-122158569 @yhuai the benchmarking results are attached. It might be interesting to see how the operator performs on different datasets. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8682][SQL][WIP] Range Join
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/7379#issuecomment-122177553 The = case is quite easy to implement. This implementation is currently targetted at range joining a rather small (broadcastable) to an arbitrarily large table. I don't think this matches the use case of SMJ: i.e. equi joining arbitrarily large tables. But I might be missing something? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8682][SQL][WIP] Range Join
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7379#discussion_r34862439 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastRangeJoin.scala --- @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.joins + +import org.apache.spark.sql.catalyst.util.TypeUtils +import org.apache.spark.util.ThreadUtils + +import scala.annotation.tailrec +import scala.collection.mutable +import scala.concurrent._ +import scala.concurrent.duration._ + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} + +/** + * Performs an inner range join on two tables. A range join typically has the following form: + * + * SELECT A.* + *,B.* + * FROM tableA A + *JOIN tableB B + * ON A.start = B.end + * AND A.end B.start + * + * The implementation builds a range index from the smaller build side, broadcasts this index + * to all executors. The streaming side is then matched against the index. This reduces the number + * of comparisons made by log(n) (n is the number of records in the build table) over the + * typical solution (Nested Loop Join). + * + * TODO NaN values + * TODO NULL values + * TODO Outer joins? StreamSide is quite easy/BuildSide requires bookkeeping and + * TODO This join will maintain sort order. The build side rows will also be added in a lower + * bound sorted fashion. + */ +@DeveloperApi +case class BroadcastRangeJoin( +leftKeys: Seq[Expression], +rightKeys: Seq[Expression], +equality: Seq[Boolean], +buildSide: BuildSide, +left: SparkPlan, +right: SparkPlan) + extends BinaryNode { + + private[this] lazy val (buildPlan, streamedPlan) = buildSide match { +case BuildLeft = (left, right) +case BuildRight = (right, left) + } + + private[this] lazy val (buildKeys, streamedKeys) = buildSide match { +case BuildLeft = (leftKeys, rightKeys) +case BuildRight = (rightKeys, leftKeys) + } + + override def output: Seq[Attribute] = left.output ++ right.output + + @transient + private[this] lazy val buildSideKeyGenerator: Projection = +newProjection(buildKeys, buildPlan.output) + + @transient + private[this] lazy val streamSideKeyGenerator: () = MutableProjection = +newMutableProjection(streamedKeys, streamedPlan.output) + + private[this] val timeout: Duration = { +val timeoutValue = sqlContext.conf.broadcastTimeout +if (timeoutValue 0) { + Duration.Inf +} else { + timeoutValue.seconds +} + } + + // Construct the range index. + @transient + private[this] val indexBroadcastFuture = future { +// Deal with equality. +val Seq(allowLowEqual: Boolean, allowHighEqual: Boolean) = buildSide match { + case BuildLeft = equality.reverse + case BuildRight = equality +} + +// Get the ordering for the datatype. +val ordering = TypeUtils.getOrdering(buildKeys.head.dataType) + +// Note that we use .execute().collect() because we don't want to convert data to Scala types +// TODO find out if the result of a sort and a collect is still sorted. +val eventifier = RangeIndex.toRangeEvent(buildSideKeyGenerator, ordering) +val events = buildPlan.execute().map(_.copy()).collect().flatMap(eventifier) + +// Create the index. +val index = RangeIndex.build(ordering, events, allowLowEqual, allowHighEqual) + +// Broadcast the index. +sparkContext.broadcast(index
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34844502 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -38,443 +68,645 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = -(projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning(No Partition Defined for Window operation! Moving all data to a single ++ partition, this can cause serious performance degradation.) AllTuples :: Nil -} else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil -} - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning - - override def requiredChildOrdering: Seq[Seq[SortOrder]] = { -// The required child ordering has two parts. -// The first part is the expressions in the partition specification. -// We add these expressions to the required ordering to make sure input rows are grouped -// based on the partition specification. So, we only need to process a single partition -// at a time. -// The second part is the expressions specified in the ORDER BY cluase. -// Basically, we first use sort to group rows based on partition specifications and then sort -// Rows in a group based on the order specification. -(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) :: Nil +} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil } - // Since window functions basically add columns to input rows, this operator - // will not change the ordering of input rows. - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) - case class ComputedWindow( -unbound: WindowExpression, -windowFunction: WindowFunction, -resultAttribute: AttributeReference) - - // A list of window functions that need to be computed for each group. - private[this] val computedWindowExpressions = windowExpression.flatMap { window = -window.collect { - case w: WindowExpression = -ComputedWindow( - w, - BindReferences.bindReference(w.windowFunction, child.output), - AttributeReference(swindowResult:$w, w.dataType, w.nullable)()) -} - }.toArray - - private[this] val windowFrame = -windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame] + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - // Create window functions. - private[this] def windowFunctions(): Array[WindowFunction] = { -val functions = new Array[WindowFunction](computedWindowExpressions.length) -var i = 0 -while (i computedWindowExpressions.length) { - functions(i) = computedWindowExpressions(i).windowFunction.newInstance() - functions(i).init() - i += 1 + /** + * Create a bound ordering object for a given frame type and offset. A bound ordering object is + * used to determine which input row lies within the frame boundaries of an output row. There + * are two types of boundaries that can be evaluated: + * - Row Based: A row based boundary is based on the position of the row within the partition. + * The offset indicates the number of rows above or below the current row, the frame for the + * current row starts or ends. For instance, given a row based sliding frame with a lower bound + * offset of -1 and a upper bound offset of +2. The frame for row with index 5 would range from + * index 4 to index 6. + * - Range based: A range based boundary is based on the actual value of the ORDER BY + * expression(s). The offset is used to alter the value of the ORDER BY expression, for + * instance if the current order by expression has a value of 10 and the lower bound offset + * is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a + * number of constraints on the ORDER
[GitHub] spark pull request: [SPARK-8682][SQL][WIP] Range Join
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/7379#issuecomment-12257 No problem. ### Supporting N-Ary Predicates. In order to make the range join work we need the predicates to define a single interval for each side of the join. For instance the clause: ```a.low b.high b.low a.high``` implies that there are two intervals: [a.low, a.high] [b.low, b.high]. An open interval, for instance ```a.low b.high```, would also work. When we use more than two clauses, we can potentially have multiple intervals, in your example for instance ```a.key b.key and a.key2 b.key2 and a.key3=b.key3``` would yield the following intervals: [a.key1, a.key2], [a.key1, a.key3], [b.key2, b.key1] [b.key2, b.key3]. Creating a working index, that can deal with the (partially) uncorrelated intervals, will be quite a challenge (I haven't really looked into this yet). We could offcourse pick join on one pair of intervals and use filtering to take of the rest. I think the Unary and Binary cases are the most common. Let's start there, and see if there is demand for N-ary designs. ### Generalization If you consider the fact that we are joining intervals (Ranges if you will), range partitioning will not work because this assumes both intervals will be entirely in the same partition (they can span multiple partitions). When dealing with larger tables we would have to use a special interval-aware partitioning, this would create partitions for a number of fully covering non-overlapping intervals, and would multicast the rows to each interval it belongs to. The subsequent step would be using an index or doing a cartesian/BNL join. Doing a Cartesian Join in a single partition performs horrible. I thought it wouldn't be a problem either, but this completely killed the performance of an analysis I was doing for a client (account balances at specific dates). I do see opportunities for code re-use. But this would be by generalizing HashedRelation and the BroadCast join family. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/7513#issuecomment-122707719 @yhuai Don't think jenkins picked up your OK. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/7513#issuecomment-122674419 cc @yhuai --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
GitHub user hvanhovell opened a pull request: https://github.com/apache/spark/pull/7513 [SPARK-8638] [SQL] Window Function Performance Improvements - Cleanup This PR contains a few clean-ups that are a part of SPARK-8638: a few style issues got fixed, and a few tests were moved. Git commit message is wrong BTW :(... You can merge this pull request into a Git repository by running: $ git pull https://github.com/hvanhovell/spark SPARK-8638-cleanup Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/7513.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #7513 commit 4e69d08ab0292220a3cf67d3011947f80f931ed7 Author: Herman van Hovell hvanhov...@questtec.nl Date: 2015-07-19T15:52:28Z Fixed Perfomance Regression for Shrinking Window Frames (+Rebase) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8640] [SQL] Enable Processing of Multip...
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/7515#issuecomment-122912351 cc @yhuai --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8682][SQL][WIP] Range Join
GitHub user hvanhovell opened a pull request: https://github.com/apache/spark/pull/7379 [SPARK-8682][SQL][WIP] Range Join *...copied from JIRA (SPARK-8682):* Currently Spark SQL uses a Broadcast Nested Loop join (or a filtered Cartesian Join) when it has to execute the following range query: ``` SELECT A.*, B.* FROM tableA A JOIN tableB B ON A.start = B.end AND A.end B.start ``` This is horribly inefficient. The performance of this query can be greatly improved, when one of the tables can be broadcasted, by creating a range index. A range index is basically a sorted map containing the rows of the smaller table, indexed by both the high and low keys. using this structure the complexity of the query would go from O(N * M) to O(N * 2 * LOG(M)), N = number of records in the larger table, M = number of records in the smaller (indexed) table. This is currently a work in progress. I will be adding more tests and a small benchmark in the next couple of days. If you want to try this out, set the ```spark.sql.planner.rangeJoin``` option to ```true``` in the SQL configuration. You can merge this pull request into a Git repository by running: $ git pull https://github.com/hvanhovell/spark SPARK-8682 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/7379.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #7379 commit a2ff5dd2c54ca00784fc529bea2da2f05897786b Author: Herman van Hovell hvanhov...@questtec.nl Date: 2015-07-01T02:17:03Z Initial Range Join commit: Compiles Style Checks work. commit d2bd7932a2f15a41e39aca1d9fad3441b85fae44 Author: Herman van Hovell hvanhov...@questtec.nl Date: 2015-07-13T22:27:03Z Added Tests for Range Index. Ton of Bug Fixes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9066][SQL] Improve cartesian performanc...
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/7417#issuecomment-123296087 Do you have any benchmarking results for this? Would be great to see how much this improves the current situation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9066][SQL] Improve cartesian performanc...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7417#discussion_r35098517 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastCartesianProduct.scala --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.joins + +import scala.concurrent._ +import scala.concurrent.duration._ + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, JoinedRow} +import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} +import org.apache.spark.util.ThreadUtils + +/** + * :: DeveloperApi :: + */ +@DeveloperApi +case class BroadcastCartesianProduct( --- End diff -- How is this different from a [BroadcastNestedLoopJoin](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9066][SQL] Improve cartesian performanc...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7417#discussion_r34681979 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala --- @@ -34,7 +34,15 @@ case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNod val leftResults = left.execute().map(_.copy()) val rightResults = right.execute().map(_.copy()) -leftResults.cartesian(rightResults).mapPartitions { iter = +val cartesianRdd = if (leftResults.partitions.size rightResults.partitions.size) { + rightResults.cartesian(leftResults).mapPartitions { iter = +iter.map(tuple = (tuple._2, tuple._1)) + } +} else { + leftResults.cartesian(rightResults) +} + +cartesianRdd.mapPartitions { iter = val joinedRow = new JoinedRow --- End diff -- Quick question. Why not use the ```sizeInBytes```? I assume we want to move as little data as possible? Using ```sizeInBytes``` would be a bit more involved, since this would involve the planner, and (probably) adding a ```BuildSide``` parameter to ```CartesianProduct```... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9730][SQL] Add Full Outer Join support ...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/8383#discussion_r37883244 --- Diff: unsafe/src/main/java/org/apache/spark/unsafe/bitset/BitSetMethods.java --- @@ -68,6 +68,19 @@ public static boolean isSet(Object baseObject, long baseOffset, int index) { } /** + * Returns {@code true} if all bits are set. + */ + public static boolean allSet(Object baseObject, long baseOffset, long bitSetWidthInWords) { +long addr = baseOffset; +for (int i = 0; i bitSetWidthInWords; i++, addr += WORD_SIZE) { + if (Platform.getLong(baseObject, addr) == 0) { --- End diff -- This confuses me. It seems that you are checking that all bits in a given word are not set (0), instead of checking that any one the bits is not set. Am I missing something? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10100] [SQL] Perfomance improvements to...
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/8298#issuecomment-134620114 @adrian-wang the improvement is absolutely tiny, about 2-3% if you do a lot of ```min```'s of ```max```'es. This PR was a response to misdiagnosed performance regression, the real cause was the use of a map in key-less aggregation. The PR adds some value, we could add it to 1.6. @yhuai / @adrian-wang if you feel differently about this, I'll withdraw the PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9241] [SQL] [WIP] Supporting multiple D...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/9280#discussion_r43067869 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/distinctFallback.scala --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext, CodegenFallback} +import org.apache.spark.sql.types.{AbstractDataType, DataType} +import org.apache.spark.util.collection.OpenHashSet + +/** + * Fallback operator for distinct operators. This will be used when a user issues multiple + * different distinct expressions in a query. + * + * The operator uses the OpenHashSetUDT for de-duplicating values. It is, as a result, not possible + * to use UnsafeRow based aggregation. + */ +case class DistinctAggregateFallback(function: AggregateFunction2) extends DeclarativeAggregate { + override def inputTypes: Seq[AbstractDataType] = function.inputTypes + override def nullable: Boolean = function.nullable + override def dataType: DataType = function.dataType + override def children: Seq[Expression] = Seq(function) + + private[this] val input = function.children match { +case child :: Nil => child +case children => CreateStruct(children) // TODO can we test this? + } + private[this] val items = AttributeReference("itemSet", new OpenHashSetUDT(input.dataType))() + + override def aggBufferAttributes: Seq[AttributeReference] = Seq(items) + override val initialValues: Seq[Expression] = Seq(NewSet(input.dataType)) + override val updateExpressions: Seq[Expression] = Seq(AddItemToSet(input, items)) + override val mergeExpressions: Seq[Expression] = Seq(CombineSets(items.left, items.right)) + override val evaluateExpression: Expression = function match { +case f: Count => CountSet(items) +case f: DeclarativeAggregate => ReduceSetUsingDeclarativeAggregate(items, f) +case f: ImperativeAggregate => ReduceSetUsingImperativeAggregate(items, f) + } +} + +case class ReduceSetUsingImperativeAggregate(left: Expression, right: ImperativeAggregate) + extends BinaryExpression with CodegenFallback { + + override def dataType: DataType = right.dataType + + private[this] val single = right.children.size == 1 + + // TODO can we assume that the offsets are 0 when we haven't touched them yet? + private[this] val function = right +.withNewInputAggBufferOffset(0) +.withNewMutableAggBufferOffset(0) + + @transient private[this] lazy val buffer = +new SpecificMutableRow(right.aggBufferAttributes.map(_.dataType)) + + @transient private[this] lazy val singleValueInput = new GenericMutableRow(1) + + override def eval(input: InternalRow): Any = { +val result = left.eval(input).asInstanceOf[OpenHashSet[Any]] +if (result != null) { + right.initialize(buffer) + val iterator = result.iterator + if (single) { +while (iterator.hasNext) { + singleValueInput.update(0, iterator.next()) + function.update(buffer, singleValueInput) +} + } else { +while (iterator.hasNext) { + function.update(buffer, iterator.next().asInstanceOf[InternalRow]) +} + } + function.eval(buffer) +} else null + } +} + +case class ReduceSetUsingDeclarativeAggregate(left: Expression, right: DeclarativeAggregate) + extends Expression with CodegenFallback { + override def children: Seq[Expression] = Seq(left) + override def nullable: Boolean = right.nullable + override def dataType: DataType = right.d
[GitHub] spark pull request: [SPARK-9241] [SQL] [WIP] Supporting multiple D...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/9280#discussion_r43116722 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/distinctFallback.scala --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext, CodegenFallback} +import org.apache.spark.sql.types.{AbstractDataType, DataType} +import org.apache.spark.util.collection.OpenHashSet + +/** + * Fallback operator for distinct operators. This will be used when a user issues multiple + * different distinct expressions in a query. + * + * The operator uses the OpenHashSetUDT for de-duplicating values. It is, as a result, not possible + * to use UnsafeRow based aggregation. + */ +case class DistinctAggregateFallback(function: AggregateFunction2) extends DeclarativeAggregate { + override def inputTypes: Seq[AbstractDataType] = function.inputTypes + override def nullable: Boolean = function.nullable + override def dataType: DataType = function.dataType + override def children: Seq[Expression] = Seq(function) + + private[this] val input = function.children match { +case child :: Nil => child +case children => CreateStruct(children) // TODO can we test this? + } + private[this] val items = AttributeReference("itemSet", new OpenHashSetUDT(input.dataType))() + + override def aggBufferAttributes: Seq[AttributeReference] = Seq(items) + override val initialValues: Seq[Expression] = Seq(NewSet(input.dataType)) + override val updateExpressions: Seq[Expression] = Seq(AddItemToSet(input, items)) + override val mergeExpressions: Seq[Expression] = Seq(CombineSets(items.left, items.right)) + override val evaluateExpression: Expression = function match { +case f: Count => CountSet(items) +case f: DeclarativeAggregate => ReduceSetUsingDeclarativeAggregate(items, f) +case f: ImperativeAggregate => ReduceSetUsingImperativeAggregate(items, f) + } +} + +case class ReduceSetUsingImperativeAggregate(left: Expression, right: ImperativeAggregate) + extends BinaryExpression with CodegenFallback { + + override def dataType: DataType = right.dataType + + private[this] val single = right.children.size == 1 + + // TODO can we assume that the offsets are 0 when we haven't touched them yet? + private[this] val function = right +.withNewInputAggBufferOffset(0) +.withNewMutableAggBufferOffset(0) + + @transient private[this] lazy val buffer = +new SpecificMutableRow(right.aggBufferAttributes.map(_.dataType)) + + @transient private[this] lazy val singleValueInput = new GenericMutableRow(1) + + override def eval(input: InternalRow): Any = { +val result = left.eval(input).asInstanceOf[OpenHashSet[Any]] +if (result != null) { + right.initialize(buffer) + val iterator = result.iterator + if (single) { +while (iterator.hasNext) { + singleValueInput.update(0, iterator.next()) + function.update(buffer, singleValueInput) +} + } else { +while (iterator.hasNext) { + function.update(buffer, iterator.next().asInstanceOf[InternalRow]) +} + } + function.eval(buffer) +} else null + } +} + +case class ReduceSetUsingDeclarativeAggregate(left: Expression, right: DeclarativeAggregate) + extends Expression with CodegenFallback { + override def children: Seq[Expression] = Seq(left) + override def nullable: Boolean = right.nullable + override def dataType: DataType = right.d
[GitHub] spark pull request: [SPARK-9298][SQL] Add pearson correlation aggr...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/8587#discussion_r43439837 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala --- @@ -524,6 +525,133 @@ case class Sum(child: Expression) extends DeclarativeAggregate { override val evaluateExpression = Cast(currentSum, resultType) } +/** + * Compute Pearson correlation between two expressions. + * When applied on empty data (i.e., count is zero), it returns NaN. + * + * Definition of Pearson correlation can be found at + * http://en.wikipedia.org/wiki/Pearson_product-moment_correlation_coefficient + * + * @param left one of the expressions to compute correlation with. + * @param right another expression to compute correlation with. + */ +case class Corr( +left: Expression, +right: Expression, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) + extends ImperativeAggregate { + + def children: Seq[Expression] = Seq(left, right) + + def nullable: Boolean = false + + def dataType: DataType = DoubleType + + def inputTypes: Seq[AbstractDataType] = Seq(DoubleType) + + def aggBufferSchema: StructType = StructType.fromAttributes(aggBufferAttributes) + + def inputAggBufferAttributes: Seq[AttributeReference] = aggBufferAttributes.map(_.newInstance()) + + val aggBufferAttributes: Seq[AttributeReference] = Seq( +AttributeReference("xAvg", DoubleType)(), +AttributeReference("yAvg", DoubleType)(), +AttributeReference("Ck", DoubleType)(), +AttributeReference("MkX", DoubleType)(), +AttributeReference("MkY", DoubleType)(), +AttributeReference("count", LongType)()) + + override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): ImperativeAggregate = +copy(mutableAggBufferOffset = newMutableAggBufferOffset) + + override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate = +copy(inputAggBufferOffset = newInputAggBufferOffset) + + override def initialize(buffer: MutableRow): Unit = { +(0 until 5).map(idx => buffer.setDouble(mutableAggBufferOffset + idx, 0.0)) +buffer.setLong(mutableAggBufferOffset + 5, 0L) + } + + override def update(buffer: MutableRow, input: InternalRow): Unit = { +val x = left.eval(input).asInstanceOf[Double] +val y = right.eval(input).asInstanceOf[Double] + +var xAvg = buffer.getDouble(mutableAggBufferOffset) +var yAvg = buffer.getDouble(mutableAggBufferOffset + 1) +var Ck = buffer.getDouble(mutableAggBufferOffset + 2) +var MkX = buffer.getDouble(mutableAggBufferOffset + 3) +var MkY = buffer.getDouble(mutableAggBufferOffset + 4) +var count = buffer.getLong(mutableAggBufferOffset + 5) + +val deltaX = x - xAvg +val deltaY = y - yAvg +count += 1 +xAvg += deltaX / count +yAvg += deltaY / count +Ck += deltaX * (y - yAvg) +MkX += deltaX * (x - xAvg) +MkY += deltaY * (y - yAvg) + +buffer.setDouble(mutableAggBufferOffset, xAvg) +buffer.setDouble(mutableAggBufferOffset + 1, yAvg) +buffer.setDouble(mutableAggBufferOffset + 2, Ck) +buffer.setDouble(mutableAggBufferOffset + 3, MkX) +buffer.setDouble(mutableAggBufferOffset + 4, MkY) +buffer.setLong(mutableAggBufferOffset + 5, count) + } + + // Merge counters from other partitions. Formula can be found at: + // http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance + override def merge(buffer1: MutableRow, buffer2: InternalRow): Unit = { +val count2 = buffer2.getLong(inputAggBufferOffset + 5) + +if (count2 > 0) { --- End diff -- Thanks for the comment. Now it is obvious, I wasn't thinking... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9298][SQL] Add pearson correlation aggr...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/8587#discussion_r43162271 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala --- @@ -524,6 +525,133 @@ case class Sum(child: Expression) extends DeclarativeAggregate { override val evaluateExpression = Cast(currentSum, resultType) } +/** + * Compute Pearson correlation between two expressions. + * When applied on empty data (i.e., count is zero), it returns NaN. + * + * Definition of Pearson correlation can be found at + * http://en.wikipedia.org/wiki/Pearson_product-moment_correlation_coefficient + * + * @param left one of the expressions to compute correlation with. + * @param right another expression to compute correlation with. + */ +case class Corr( +left: Expression, +right: Expression, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) + extends ImperativeAggregate { + + def children: Seq[Expression] = Seq(left, right) + + def nullable: Boolean = false + + def dataType: DataType = DoubleType + + def inputTypes: Seq[AbstractDataType] = Seq(DoubleType) + + def aggBufferSchema: StructType = StructType.fromAttributes(aggBufferAttributes) + + def inputAggBufferAttributes: Seq[AttributeReference] = aggBufferAttributes.map(_.newInstance()) + + val aggBufferAttributes: Seq[AttributeReference] = Seq( +AttributeReference("xAvg", DoubleType)(), +AttributeReference("yAvg", DoubleType)(), +AttributeReference("Ck", DoubleType)(), +AttributeReference("MkX", DoubleType)(), +AttributeReference("MkY", DoubleType)(), +AttributeReference("count", LongType)()) + + override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): ImperativeAggregate = +copy(mutableAggBufferOffset = newMutableAggBufferOffset) + + override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate = +copy(inputAggBufferOffset = newInputAggBufferOffset) + + override def initialize(buffer: MutableRow): Unit = { +(0 until 5).map(idx => buffer.setDouble(mutableAggBufferOffset + idx, 0.0)) +buffer.setLong(mutableAggBufferOffset + 5, 0L) + } + + override def update(buffer: MutableRow, input: InternalRow): Unit = { +val x = left.eval(input).asInstanceOf[Double] +val y = right.eval(input).asInstanceOf[Double] + +var xAvg = buffer.getDouble(mutableAggBufferOffset) +var yAvg = buffer.getDouble(mutableAggBufferOffset + 1) +var Ck = buffer.getDouble(mutableAggBufferOffset + 2) +var MkX = buffer.getDouble(mutableAggBufferOffset + 3) +var MkY = buffer.getDouble(mutableAggBufferOffset + 4) +var count = buffer.getLong(mutableAggBufferOffset + 5) + +val deltaX = x - xAvg +val deltaY = y - yAvg +count += 1 +xAvg += deltaX / count +yAvg += deltaY / count +Ck += deltaX * (y - yAvg) +MkX += deltaX * (x - xAvg) +MkY += deltaY * (y - yAvg) + +buffer.setDouble(mutableAggBufferOffset, xAvg) +buffer.setDouble(mutableAggBufferOffset + 1, yAvg) +buffer.setDouble(mutableAggBufferOffset + 2, Ck) +buffer.setDouble(mutableAggBufferOffset + 3, MkX) +buffer.setDouble(mutableAggBufferOffset + 4, MkY) +buffer.setLong(mutableAggBufferOffset + 5, count) + } + + // Merge counters from other partitions. Formula can be found at: + // http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance + override def merge(buffer1: MutableRow, buffer2: InternalRow): Unit = { +val count2 = buffer2.getLong(inputAggBufferOffset + 5) + +if (count2 > 0) { --- End diff -- Is it safe to assume that the ```count2``` in ```buffer1``` is non zero? There is - currently - no documentation on this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11388][Build]Fix self closing tags.
GitHub user hvanhovell opened a pull request: https://github.com/apache/spark/pull/9339 [SPARK-11388][Build]Fix self closing tags. Java 8 javadoc does not like self closing tags: ``, ``, ... This PR fixes those. You can merge this pull request into a Git repository by running: $ git pull https://github.com/hvanhovell/spark SPARK-11388 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/9339.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #9339 commit 7d8052830cdf6456e4a8e3233c943bccf595dc9d Author: Herman van Hovell <hvanhov...@questtec.nl> Date: 2015-10-28T21:00:09Z Fix self closing tags. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11388][Build]Fix self closing tags.
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/9339#issuecomment-152135261 jenkins retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11388][Build]Fix self closing tags.
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/9339#issuecomment-152135237 Hmmm, test fails with the following beautiful error: [error] (core/test:test) sbt.TestsFailedException: Tests unsuccessful [error] Total time: 4809 s, completed Oct 29, 2015 2:32:35 AM Retesting. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9241] [SQL] [WIP] Supporting multiple D...
GitHub user hvanhovell opened a pull request: https://github.com/apache/spark/pull/9280 [SPARK-9241] [SQL] [WIP] Supporting multiple DISTINCT columns This PR adds support for multiple distinct columns to the new aggregation code path. The implementation uses the ```OpenHashSet``` class and set expressions. As a result we can only use the slower sort based aggregation code path. This also means the code will be probably slower than the old hash aggregation. The PR is currently in the proof of concept phase, and I have submitted it to get some feedback to see if I am headed in the right direction. I'll add more tests if this considered to be the way to go. An example using the new code path: val df = sqlContext .range(1 << 25) .select( $"id".as("employee_id"), (rand(6321782L) * 4 + 1).cast("int").as("department_id"), when(rand(981293L) >= 0.5, "M").otherwise("F").as("gender"), (rand(7123L) * 3 + 1).cast("int").as("education_level") ) df.registerTempTable("employee") // Regular query. sql(""" select department_id as d, count(distinct gender, education_level) as c0, count(distinct gender) as c1, count(distinct education_level) as c2 from employee group by department_id """).show() cc @yhuai You can merge this pull request into a Git repository by running: $ git pull https://github.com/hvanhovell/spark SPARK-9241 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/9280.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #9280 commit 256e1f6902b8adbc304c6e287d7cfdf2ef97b12b Author: Herman van Hovell <hvanhov...@questtec.nl> Date: 2015-10-26T12:46:33Z Created distinct fallback mechanism. commit 6a87384de8d934327ead72daf7210e29be8687b6 Author: Herman van Hovell <hvanhov...@questtec.nl> Date: 2015-10-26T13:35:01Z Added fallback distinct creation to aggregate conversion. commit 3bd6db5390dee044ab4673e38329f584b0436a66 Author: Herman van Hovell <hvanhov...@questtec.nl> Date: 2015-10-26T15:07:22Z Fix style. Fix CG for OpenHashSetUDT. Fix bug. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11594][SQL][REPL] Cannot create UDAF in...
GitHub user hvanhovell opened a pull request: https://github.com/apache/spark/pull/9568 [SPARK-11594][SQL][REPL] Cannot create UDAF in REPL This PR enables users to create a UDAF in the REPL without getting a ```java.lang.InternalError```. You can merge this pull request into a Git repository by running: $ git pull https://github.com/hvanhovell/spark SPARK-11594 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/9568.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #9568 commit 401230c51ad93425c90ec3c58746dbfba154410c Author: Herman van Hovell <hvanhov...@questtec.nl> Date: 2015-11-09T13:21:29Z Enable the creation of a UDAF in the REPL. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9830] [SQL] Remove AggregateExpression1...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/9556#discussion_r44269743 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala --- @@ -141,40 +141,46 @@ class WindowSpec private[sql]( */ private[sql] def withAggregate(aggregate: Column): Column = { val windowExpr = aggregate.expr match { - case Average(child) => WindowExpression( -UnresolvedWindowFunction("avg", child :: Nil), -WindowSpecDefinition(partitionSpec, orderSpec, frame)) - case Sum(child) => WindowExpression( -UnresolvedWindowFunction("sum", child :: Nil), -WindowSpecDefinition(partitionSpec, orderSpec, frame)) - case Count(child) => WindowExpression( -UnresolvedWindowFunction("count", child :: Nil), -WindowSpecDefinition(partitionSpec, orderSpec, frame)) - case First(child, ignoreNulls) => WindowExpression( -// TODO this is a hack for Hive UDAF first_value -UnresolvedWindowFunction( - "first_value", - child :: ignoreNulls :: Nil), -WindowSpecDefinition(partitionSpec, orderSpec, frame)) - case Last(child, ignoreNulls) => WindowExpression( -// TODO this is a hack for Hive UDAF last_value -UnresolvedWindowFunction( - "last_value", - child :: ignoreNulls :: Nil), -WindowSpecDefinition(partitionSpec, orderSpec, frame)) - case Min(child) => WindowExpression( -UnresolvedWindowFunction("min", child :: Nil), -WindowSpecDefinition(partitionSpec, orderSpec, frame)) - case Max(child) => WindowExpression( -UnresolvedWindowFunction("max", child :: Nil), -WindowSpecDefinition(partitionSpec, orderSpec, frame)) - case wf: WindowFunction => WindowExpression( -wf, -WindowSpecDefinition(partitionSpec, orderSpec, frame)) + case AggregateExpression(aggregateFunction, _, isDistinct) if !isDistinct => +aggregateFunction match { --- End diff -- Window functions also need to be wrapped in an ```AggregateExpression``` in ```functions.scala``` for this to work. It currently fails HiveDataFrameWindowSuite due to this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11594][SQL][REPL] Cannot create UDAF in...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/9568#discussion_r44273796 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/expressions/udaf.scala --- @@ -129,6 +129,13 @@ abstract class UserDefinedAggregateFunction extends Serializable { isDistinct = true) Column(aggregateExpression) } + + /** +* The name of the UDAF. This is currently the simple name of class. This can create an +* [[java.lang.InternalError]] if the UDAF class was created in the REPL; override this method +* in these cases. +*/ + def name: String = getClass.getSimpleName --- End diff -- I am not catching a ```java.lang.InternalError``` here, because I think these shouldn't be caught at all (the error could have different cause). We enable the use to redefine the name to work arround this problem. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9830] [SQL] Remove AggregateExpression1...
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/9556#issuecomment-155054384 Made a quick pass. I think the PR is in good shape. I do have a few aditional questions/remarks: * Maybe should add some documentation to ```DeclarativeAggregate``` to explain why developers have to use lazy vals for all expressions involving the child expression(s). * ```MultipleDistinctRewriter``` is in a 'funny' file/location. Shouldn't we move it to the analyzer? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11594][SQL][REPL] Cannot create UDAF in...
Github user hvanhovell closed the pull request at: https://github.com/apache/spark/pull/9568 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11594][SQL][REPL] Cannot create UDAF in...
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/9568#issuecomment-155710292 Move to scala 2.10.5 fixed this. Closing PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11594][SQL][REPL] Cannot create UDAF in...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/9568#discussion_r44469882 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala --- @@ -452,7 +452,7 @@ private[sql] case class ScalaUDAF( } override def toString: String = { -s"""${udaf.getClass.getSimpleName}(${children.mkString(",")})""" --- End diff -- The ```getSimpleName``` call on the defined class causes the internal error. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [WIP] [SPARK-11636] [SQL] Support as for class...
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/9602#issuecomment-155576674 Does a REPL defined class blow up with a ```java.lang.InternalError```? If it does, then we have the same problem: https://github.com/apache/spark/pull/9568 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9241][SQL] Supporting multiple DISTINCT...
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/9566#issuecomment-155578114 I'll rebase. The potential bug is caused by the fact that the attributes for the distinct columns and the expressions for the distinct columns can possibly be misaligned. This currently relies on the fact that a sequence maintains the same order as a (hash) map derived from it. This currently works, but is not guaranteed by the scala collections library. I'd rather fix this, than to wait for something to go wrong. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9241][SQL] Supporting multiple DISTINCT...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/9566#discussion_r44514711 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala --- @@ -545,19 +576,21 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te | count(distinct value2), | sum(distinct value2), | count(distinct value1, value2), + | longProductSum(distinct value1, value2), --- End diff -- Yes and No. The input we care about only consists of these tuples: ```[value1=null, value2=null], [value1=null, value2=1], [value1=1, value2=null], and [value1=1, value2=1]``` However in the current implementation a distinct aggregate will see more input than those. It will also see records from other groups. However, the values in these records are nulled out. The assumption here is that an AggregateFunction is not changed by an all NULL update. The only case I can think of that would be problematic is a ```FIRST(DISTINCT ...)```; which shouldn't be used like that anyway. We could solve this by wrapping AggregateFunctions with an operator which will only update if the group id is correct. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9241][SQL] Supporting multiple DISTINCT...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/9566#discussion_r44472817 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala --- @@ -151,11 +151,12 @@ case class DistinctAggregationRewriter(conf: CatalystConf) extends Rule[LogicalP } // Setup unique distinct aggregate children. - val distinctAggChildren = distinctAggGroups.keySet.flatten.toSeq - val distinctAggChildAttrMap = distinctAggChildren.map(expressionAttributePair).toMap --- End diff -- By calling ```toMap``` I am potentially breaking the alignment between ```distinctAggChildren``` and ```distinctAggChildAttrs```. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11594][SQL][REPL] Cannot create UDAF in...
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/9568#issuecomment-155589310 Sounds like a good idea. I'll add this in the morning. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11594][SQL][REPL] Cannot create UDAF in...
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/9568#issuecomment-155582285 This is actually a scala problem: https://issues.scala-lang.org/browse/SI-9051 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11451][SQL] Support single distinct cou...
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/9409#issuecomment-154825955 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11451][SQL] Support single distinct cou...
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/9409#issuecomment-154826382 Jenkins does not like me... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11451][SQL] Support single distinct cou...
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/9409#issuecomment-154831183 @yhuai can you get jenkins to test this? The bug exposed by this patch affected the regular aggregation path, as soon as we used more than one regular aggregate, the chance existed that an attribute and its source expression got misaligned. This has been fixed. I have also added a test for this situation. If we choose not to add this to the 1.6 branch, then we have to create a separte PR containing only the bugfix and get that one in. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11451][SQL] Support single distinct cou...
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/9409#issuecomment-154825193 test this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9830] [SQL] Remove AggregateExpression1...
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/9556#issuecomment-154984531 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11553] [SQL] Primitive Row accessors sh...
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/9642#issuecomment-156693107 ```UnsafeRow``` and ```SpecificRow``` have similar problems. Shouldn't we fix those as well? For example: import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow import org.apache.spark.sql.catalyst.expressions.UnsafeRow val srow = new SpecificMutableRow(IntegerType :: Nil) srow.isNullAt(0) srow.getInt(0) val urow = new UnsafeRow() urow.pointTo(new Array[Byte](16), 1, 4) urow.isNullAt(0) urow.getInt(0) // Result: import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow import org.apache.spark.sql.catalyst.expressions.UnsafeRow srow: org.apache.spark.sql.catalyst.expressions.SpecificMutableRow = [null] res129: Boolean = true res130: Int = 0 urow: org.apache.spark.sql.catalyst.expressions.UnsafeRow = [] res133: Boolean = false res134: Int = 0 I'd actualy rather not touch this at all. When you are using internal API you should be more carefull and expect some quirkiness. I can currently think of only one place in which this causes some problems: UDFs with primitive parameters. The engine will pass in default values instead of nulls. Are there any other situations in which this causes problems? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11553] [SQL] Primitive Row accessors sh...
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/9642#issuecomment-156697230 > You mean discard change and only update documentation? Yes this is one option - I even think that it is not so bad. I would only update the documentation. Internal mutable rows are among the most performance critical classes in Spark SQL, so I am not that keen to add (potentially unnecessary) branching to every primitive getter. When someone is using ```InternalRow```s he or she is using a SparkSQL internal API anyway, and really should be knowing what (s)he is doing. > You mean problem caused by not introducing this change? Yes, do you know any other problems caused by this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9830] [SQL] Remove AggregateExpression1...
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/9556#issuecomment-155225000 PySpark results are correct, just not in the correct form :(... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9830] [SQL] Remove AggregateExpression1...
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/9556#issuecomment-155224746 LGTM, pending a successful test build. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9830] [SQL] Remove AggregateExpression1...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/9556#discussion_r44343490 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -146,148 +146,105 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } - object HashAggregation extends Strategy { -def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - // Aggregations that can be performed in two phases, before and after the shuffle. - case PartialAggregation( - namedGroupingAttributes, - rewrittenAggregateExpressions, - groupingExpressions, - partialComputation, - child) if !canBeConvertedToNewAggregation(plan) => -execution.Aggregate( - partial = false, - namedGroupingAttributes, - rewrittenAggregateExpressions, - execution.Aggregate( -partial = true, -groupingExpressions, -partialComputation, -planLater(child))) :: Nil - - case _ => Nil -} - -def canBeConvertedToNewAggregation(plan: LogicalPlan): Boolean = plan match { - case a: logical.Aggregate => -if (sqlContext.conf.useSqlAggregate2 && sqlContext.conf.codegenEnabled) { - a.newAggregation.isDefined -} else { - Utils.checkInvalidAggregateFunction2(a) - false -} - case _ => false -} - -def allAggregates(exprs: Seq[Expression]): Seq[AggregateExpression1] = - exprs.flatMap(_.collect { case a: AggregateExpression1 => a }) - } - /** * Used to plan the aggregate operator for expressions based on the AggregateFunction2 interface. */ object Aggregation extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case p: logical.Aggregate if sqlContext.conf.useSqlAggregate2 && - sqlContext.conf.codegenEnabled => -val converted = p.newAggregation -converted match { - case None => Nil // Cannot convert to new aggregation code path. - case Some(logical.Aggregate(groupingExpressions, resultExpressions, child)) => -// A single aggregate expression might appear multiple times in resultExpressions. -// In order to avoid evaluating an individual aggregate function multiple times, we'll -// build a set of the distinct aggregate expressions and build a function which can -// be used to re-write expressions so that they reference the single copy of the -// aggregate function which actually gets computed. -val aggregateExpressions = resultExpressions.flatMap { expr => - expr.collect { -case agg: AggregateExpression2 => agg - } -}.distinct -// For those distinct aggregate expressions, we create a map from the -// aggregate function to the corresponding attribute of the function. -val aggregateFunctionToAttribute = aggregateExpressions.map { agg => - val aggregateFunction = agg.aggregateFunction - val attribute = Alias(aggregateFunction, aggregateFunction.toString)().toAttribute - (aggregateFunction, agg.isDistinct) -> attribute -}.toMap - -val (functionsWithDistinct, functionsWithoutDistinct) = - aggregateExpressions.partition(_.isDistinct) -if (functionsWithDistinct.map(_.aggregateFunction.children).distinct.length > 1) { - // This is a sanity check. We should not reach here when we have multiple distinct - // column sets (aggregate.NewAggregation will not match). - sys.error( -"Multiple distinct column sets are not supported by the new aggregation" + - "code path.") -} + case logical.Aggregate(groupingExpressions, resultExpressions, child) => +// A single aggregate expression might appear multiple times in resultExpressions. +// In order to avoid evaluating an individual aggregate function multiple times, we'll +// build a set of the distinct aggregate expressions and build a function which can +// be used to re-write expressions so that they reference the single copy of the +// aggregate function which actually gets computed. +val aggregateExpressions = resultExpressions.flatMap { expr => + expr.collect { +
[GitHub] spark pull request: [SPARK-9241][SQL] Supporting multiple DISTINCT...
GitHub user hvanhovell opened a pull request: https://github.com/apache/spark/pull/9566 [SPARK-9241][SQL] Supporting multiple DISTINCT columns - follow-up (3) This PR is a 2nd follow-up for [SPARK-9241](https://issues.apache.org/jira/browse/SPARK-9241). It contains the following improvements: * Fix for a potential bug in distinct child expression and attribute alignment. * Improved handling of duplicate distinct child expressions. * Added test for distinct UDAF with multiple children. cc @yhuai You can merge this pull request into a Git repository by running: $ git pull https://github.com/hvanhovell/spark SPARK-9241-followup-2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/9566.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #9566 commit ea4ea69ca4980163c9a31aa4f65b6db6121d62e3 Author: Herman van Hovell <hvanhov...@questtec.nl> Date: 2015-11-09T11:44:26Z Fix potential bug in distinct child expression and attribute alignment & improve handling of duplicate distinct child expressions. Added test for distinct UDAF with multiple children. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org