Repository: spark Updated Branches: refs/heads/master 4b9542e3a -> 553af22f2
[SPARK-16323][SQL] Add IntegralDivide expression ## What changes were proposed in this pull request? The PR takes over #14036 and it introduces a new expression `IntegralDivide` in order to avoid the several unneded cast added previously. In order to prove the performance gain, the following benchmark has been run: ``` test("Benchmark IntegralDivide") { val r = new scala.util.Random(91) val nData = 1000000 val testDataInt = (1 to nData).map(_ => (r.nextInt(), r.nextInt())) val testDataLong = (1 to nData).map(_ => (r.nextLong(), r.nextLong())) val testDataShort = (1 to nData).map(_ => (r.nextInt().toShort, r.nextInt().toShort)) // old code val oldExprsInt = testDataInt.map(x => Cast(Divide(Cast(Literal(x._1), DoubleType), Cast(Literal(x._2), DoubleType)), LongType)) val oldExprsLong = testDataLong.map(x => Cast(Divide(Cast(Literal(x._1), DoubleType), Cast(Literal(x._2), DoubleType)), LongType)) val oldExprsShort = testDataShort.map(x => Cast(Divide(Cast(Literal(x._1), DoubleType), Cast(Literal(x._2), DoubleType)), LongType)) // new code val newExprsInt = testDataInt.map(x => IntegralDivide(x._1, x._2)) val newExprsLong = testDataLong.map(x => IntegralDivide(x._1, x._2)) val newExprsShort = testDataShort.map(x => IntegralDivide(x._1, x._2)) Seq(("Long", "old", oldExprsLong), ("Long", "new", newExprsLong), ("Int", "old", oldExprsInt), ("Int", "new", newExprsShort), ("Short", "old", oldExprsShort), ("Short", "new", oldExprsShort)).foreach { case (dt, t, ds) => val start = System.nanoTime() ds.foreach(e => e.eval(EmptyRow)) val endNoCodegen = System.nanoTime() println(s"Running $nData op with $t code on $dt (no-codegen): ${(endNoCodegen - start) / 1000000} ms") } } ``` The results on my laptop are: ``` Running 1000000 op with old code on Long (no-codegen): 600 ms Running 1000000 op with new code on Long (no-codegen): 112 ms Running 1000000 op with old code on Int (no-codegen): 560 ms Running 1000000 op with new code on Int (no-codegen): 135 ms Running 1000000 op with old code on Short (no-codegen): 317 ms Running 1000000 op with new code on Short (no-codegen): 153 ms ``` Showing a 2-5X improvement. The benchmark doesn't include code generation as it is pretty hard to test the performance there as for such simple operations the most of the time is spent in the code generation/compilation process. ## How was this patch tested? added UTs Closes #22395 from mgaido91/SPARK-16323. Authored-by: Marco Gaido <marcogaid...@gmail.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/553af22f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/553af22f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/553af22f Branch: refs/heads/master Commit: 553af22f2c8ecdc039c8d06431564b1432e60d2d Parents: 4b9542e Author: Marco Gaido <marcogaid...@gmail.com> Authored: Mon Sep 17 11:33:50 2018 -0700 Committer: Dongjoon Hyun <dongj...@apache.org> Committed: Mon Sep 17 11:33:50 2018 -0700 ---------------------------------------------------------------------- .../catalyst/analysis/FunctionRegistry.scala | 1 + .../apache/spark/sql/catalyst/dsl/package.scala | 1 + .../sql/catalyst/expressions/arithmetic.scala | 28 ++++++++++++++++++++ .../spark/sql/catalyst/parser/AstBuilder.scala | 2 +- .../expressions/ArithmeticExpressionSuite.scala | 18 ++++++------- .../catalyst/parser/ExpressionParserSuite.scala | 4 +-- .../sql-tests/results/operators.sql.out | 8 +++--- 7 files changed, 45 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/553af22f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 77860e1..8b69a47 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -267,6 +267,7 @@ object FunctionRegistry { expression[Subtract]("-"), expression[Multiply]("*"), expression[Divide]("/"), + expression[IntegralDivide]("div"), expression[Remainder]("%"), // aggregate functions http://git-wip-us.apache.org/repos/asf/spark/blob/553af22f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index d3ccd18..176ea82 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -72,6 +72,7 @@ package object dsl { def - (other: Expression): Expression = Subtract(expr, other) def * (other: Expression): Expression = Multiply(expr, other) def / (other: Expression): Expression = Divide(expr, other) + def div (other: Expression): Expression = IntegralDivide(expr, other) def % (other: Expression): Expression = Remainder(expr, other) def & (other: Expression): Expression = BitwiseAnd(expr, other) def | (other: Expression): Expression = BitwiseOr(expr, other) http://git-wip-us.apache.org/repos/asf/spark/blob/553af22f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala index c827226..1b1808f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala @@ -314,6 +314,34 @@ case class Divide(left: Expression, right: Expression) extends DivModLike { override def evalOperation(left: Any, right: Any): Any = div(left, right) } +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "expr1 _FUNC_ expr2 - Divide `expr1` by `expr2` rounded to the long integer. It returns NULL if an operand is NULL or `expr2` is 0.", + examples = """ + Examples: + > SELECT 3 _FUNC_ 2; + 1 + """, + since = "2.5.0") +// scalastyle:on line.size.limit +case class IntegralDivide(left: Expression, right: Expression) extends DivModLike { + + override def inputType: AbstractDataType = IntegralType + override def dataType: DataType = LongType + + override def symbol: String = "/" + override def sqlOperator: String = "div" + + private lazy val div: (Any, Any) => Long = left.dataType match { + case i: IntegralType => + val divide = i.integral.asInstanceOf[Integral[Any]].quot _ + val toLong = i.integral.asInstanceOf[Integral[Any]].toLong _ + (x, y) => toLong(divide(x, y)) + } + + override def evalOperation(left: Any, right: Any): Any = div(left, right) +} + @ExpressionDescription( usage = "expr1 _FUNC_ expr2 - Returns the remainder after `expr1`/`expr2`.", examples = """ http://git-wip-us.apache.org/repos/asf/spark/blob/553af22f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 7bc1f63..5cfb5dc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -1157,7 +1157,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging case SqlBaseParser.PERCENT => Remainder(left, right) case SqlBaseParser.DIV => - Cast(Divide(left, right), LongType) + IntegralDivide(left, right) case SqlBaseParser.PLUS => Add(left, right) case SqlBaseParser.MINUS => http://git-wip-us.apache.org/repos/asf/spark/blob/553af22f/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ArithmeticExpressionSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ArithmeticExpressionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ArithmeticExpressionSuite.scala index 9a752af..c3c4d9e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ArithmeticExpressionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ArithmeticExpressionSuite.scala @@ -143,16 +143,14 @@ class ArithmeticExpressionSuite extends SparkFunSuite with ExpressionEvalHelper } } - // By fixing SPARK-15776, Divide's inputType is required to be DoubleType of DecimalType. - // TODO: in future release, we should add a IntegerDivide to support integral types. - ignore("/ (Divide) for integral type") { - checkEvaluation(Divide(Literal(1.toByte), Literal(2.toByte)), 0.toByte) - checkEvaluation(Divide(Literal(1.toShort), Literal(2.toShort)), 0.toShort) - checkEvaluation(Divide(Literal(1), Literal(2)), 0) - checkEvaluation(Divide(Literal(1.toLong), Literal(2.toLong)), 0.toLong) - checkEvaluation(Divide(positiveShortLit, negativeShortLit), 0.toShort) - checkEvaluation(Divide(positiveIntLit, negativeIntLit), 0) - checkEvaluation(Divide(positiveLongLit, negativeLongLit), 0L) + test("/ (Divide) for integral type") { + checkEvaluation(IntegralDivide(Literal(1.toByte), Literal(2.toByte)), 0L) + checkEvaluation(IntegralDivide(Literal(1.toShort), Literal(2.toShort)), 0L) + checkEvaluation(IntegralDivide(Literal(1), Literal(2)), 0L) + checkEvaluation(IntegralDivide(Literal(1.toLong), Literal(2.toLong)), 0L) + checkEvaluation(IntegralDivide(positiveShortLit, negativeShortLit), 0L) + checkEvaluation(IntegralDivide(positiveIntLit, negativeIntLit), 0L) + checkEvaluation(IntegralDivide(positiveLongLit, negativeLongLit), 0L) } test("% (Remainder)") { http://git-wip-us.apache.org/repos/asf/spark/blob/553af22f/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala index 781fc1e..b4df22c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala @@ -203,7 +203,7 @@ class ExpressionParserSuite extends PlanTest { // Simple operations assertEqual("a * b", 'a * 'b) assertEqual("a / b", 'a / 'b) - assertEqual("a DIV b", ('a / 'b).cast(LongType)) + assertEqual("a DIV b", 'a div 'b) assertEqual("a % b", 'a % 'b) assertEqual("a + b", 'a + 'b) assertEqual("a - b", 'a - 'b) @@ -214,7 +214,7 @@ class ExpressionParserSuite extends PlanTest { // Check precedences assertEqual( "a * t | b ^ c & d - e + f % g DIV h / i * k", - 'a * 't | ('b ^ ('c & ('d - 'e + (('f % 'g / 'h).cast(LongType) / 'i * 'k))))) + 'a * 't | ('b ^ ('c & ('d - 'e + (('f % 'g div 'h) / 'i * 'k))))) } test("unary arithmetic expressions") { http://git-wip-us.apache.org/repos/asf/spark/blob/553af22f/sql/core/src/test/resources/sql-tests/results/operators.sql.out ---------------------------------------------------------------------- diff --git a/sql/core/src/test/resources/sql-tests/results/operators.sql.out b/sql/core/src/test/resources/sql-tests/results/operators.sql.out index 840655b..2555734 100644 --- a/sql/core/src/test/resources/sql-tests/results/operators.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/operators.sql.out @@ -157,7 +157,7 @@ NULL -- !query 19 select 5 div 2 -- !query 19 schema -struct<CAST((CAST(5 AS DOUBLE) / CAST(2 AS DOUBLE)) AS BIGINT):bigint> +struct<(5 div 2):bigint> -- !query 19 output 2 @@ -165,7 +165,7 @@ struct<CAST((CAST(5 AS DOUBLE) / CAST(2 AS DOUBLE)) AS BIGINT):bigint> -- !query 20 select 5 div 0 -- !query 20 schema -struct<CAST((CAST(5 AS DOUBLE) / CAST(0 AS DOUBLE)) AS BIGINT):bigint> +struct<(5 div 0):bigint> -- !query 20 output NULL @@ -173,7 +173,7 @@ NULL -- !query 21 select 5 div null -- !query 21 schema -struct<CAST((CAST(5 AS DOUBLE) / CAST(NULL AS DOUBLE)) AS BIGINT):bigint> +struct<(5 div CAST(NULL AS INT)):bigint> -- !query 21 output NULL @@ -181,7 +181,7 @@ NULL -- !query 22 select null div 5 -- !query 22 schema -struct<CAST((CAST(NULL AS DOUBLE) / CAST(5 AS DOUBLE)) AS BIGINT):bigint> +struct<(CAST(NULL AS INT) div 5):bigint> -- !query 22 output NULL --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org