dtenedor commented on code in PR #38419:
URL: https://github.com/apache/spark/pull/38419#discussion_r1073909921


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala:
##########
@@ -331,6 +332,268 @@ case class RoundCeil(child: Expression, scale: Expression)
     copy(child = newLeft, scale = newRight)
 }
 
+case class TruncNumber(child: Expression, scale: Expression)
+  extends BaseBinaryExpression with NullIntolerant {
+
+  override protected def withNewChildrenInternal(
+      newLeft: Expression,
+      newRight: Expression): TruncNumber = copy(child = newLeft, scale = 
newRight)
+
+  /**
+   * Returns Java source code that can be compiled to evaluate this 
expression. The default
+   * behavior is to call the eval method of the expression. Concrete 
expression implementations
+   * should override this to do actual code generation.
+   *
+   * @param ctx
+   *   a [[CodegenContext]]
+   * @param ev
+   *   an [[ExprCode]] with unique terms.
+   * @return
+   *   an [[ExprCode]] containing the Java source code to generate the given 
expression
+   */
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode =
+    defineCodeGen(
+      ctx,
+      ev,
+      (input, _) => {
+        dataType match {
+          case ByteType if (_scale <= 0) =>
+            
s"""(byte)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case ShortType if (_scale <= 0) =>
+            
s"""(short)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case IntegerType if (_scale <= 0) =>
+            
s"""(int)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case LongType if (_scale <= 0) =>
+            s"""(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}))""".stripMargin
+          case FloatType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).floatValue()""".stripMargin
+          case DoubleType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).doubleValue()""".stripMargin
+          case DecimalType.Fixed(_, _) =>
+            s"""Decimal.apply(
+             |org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |${input}.toJavaBigDecimal(), ${_scale}))""".stripMargin
+          case _ => s"$input"
+        }
+      })
+
+  /**
+   * Returns the [[DataType]] of the result of evaluating this expression. It 
is invalid to query
+   * the dataType of an unresolved expression (i.e., when `resolved` == false).
+   */
+  override lazy val dataType: DataType = {
+    child.dataType match {
+      case DecimalType.Fixed(p, s) =>
+        val newPosition =
+          if (_scale > 0) {

Review Comment:
   this block is equivalent to `scaleValue.max(0).min(s)`.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala:
##########
@@ -1432,6 +1695,53 @@ case class Logarithm(left: Expression, right: Expression)
     newLeft: Expression, newRight: Expression): Logarithm = copy(left = 
newLeft, right = newRight)
 }
 
+trait BaseBinaryExpression extends BinaryExpression
+  with ExpectsInputTypes
+  with Serializable
+  with ImplicitCastInputTypes {
+  val child: Expression
+  val scale: Expression
+  override def left: Expression = child
+  override def right: Expression = scale
+  override def nullable: Boolean = true
+  override def foldable: Boolean = child.foldable
+
+  /**
+   * Expected input types from child expressions. The i-th position in the 
returned seq indicates
+   * the type requirement for the i-th child.
+   *
+   * The possible values at each position are:
+   *   1. a specific data type, e.g. LongType, StringType.
+   *   2. a non-leaf abstract data type,
+   *      e.g.NumericType, IntegralType, FractionalType.
+   */
+  override def inputTypes: Seq[AbstractDataType] = Seq(NumericType, 
IntegerType)
+
+  // Avoid repeated evaluation since `scale` is a constant int,
+  // avoid unnecessary `child` evaluation in both codegen and non-codegen eval
+  // by checking if scaleV == null as well.
+  protected lazy val scaleV: Any = scale.eval(EmptyRow)
+
+  protected lazy val _scale: Int = scaleV.asInstanceOf[Int]

Review Comment:
   the `scaleV` doesn't appear to be referenced anywhere else now. Should we 
just name this `protected lazy val scaleValue` instead to simplify, dropping 
the leading underscore?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala:
##########
@@ -331,6 +332,268 @@ case class RoundCeil(child: Expression, scale: Expression)
     copy(child = newLeft, scale = newRight)
 }
 
+case class TruncNumber(child: Expression, scale: Expression)
+  extends BaseBinaryExpression with NullIntolerant {
+
+  override protected def withNewChildrenInternal(
+      newLeft: Expression,
+      newRight: Expression): TruncNumber = copy(child = newLeft, scale = 
newRight)
+
+  /**
+   * Returns Java source code that can be compiled to evaluate this 
expression. The default
+   * behavior is to call the eval method of the expression. Concrete 
expression implementations
+   * should override this to do actual code generation.
+   *
+   * @param ctx
+   *   a [[CodegenContext]]
+   * @param ev
+   *   an [[ExprCode]] with unique terms.
+   * @return
+   *   an [[ExprCode]] containing the Java source code to generate the given 
expression
+   */
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode =
+    defineCodeGen(
+      ctx,
+      ev,
+      (input, _) => {
+        dataType match {
+          case ByteType if (_scale <= 0) =>
+            
s"""(byte)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case ShortType if (_scale <= 0) =>
+            
s"""(short)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case IntegerType if (_scale <= 0) =>
+            
s"""(int)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case LongType if (_scale <= 0) =>
+            s"""(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}))""".stripMargin
+          case FloatType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).floatValue()""".stripMargin
+          case DoubleType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).doubleValue()""".stripMargin
+          case DecimalType.Fixed(_, _) =>
+            s"""Decimal.apply(
+             |org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |${input}.toJavaBigDecimal(), ${_scale}))""".stripMargin
+          case _ => s"$input"
+        }
+      })
+
+  /**
+   * Returns the [[DataType]] of the result of evaluating this expression. It 
is invalid to query
+   * the dataType of an unresolved expression (i.e., when `resolved` == false).
+   */
+  override lazy val dataType: DataType = {
+    child.dataType match {
+      case DecimalType.Fixed(p, s) =>
+        val newPosition =
+          if (_scale > 0) {
+            if (_scale >= s) {
+              s
+            } else {
+              _scale
+            }
+          } else {
+            0
+          }
+        DecimalType(p - s + newPosition, newPosition)
+      case t => t
+    }
+  }
+
+  /**
+   * Called by default [[eval]] implementation. If subclass of 
BinaryExpression keep the default
+   * nullability, they can override this method to save null-check code. If we 
need full control
+   * of evaluation process, we should override [[eval]].
+   */
+  override protected def nullSafeEval(input1: Any, input2: Any): Any = {
+    dataType match {
+      case ByteType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Byte].toLong, _scale).toByte

Review Comment:
   can we match on both the `dataType` and the type of `input1` to skip the 
`asInstanceOf` call? That avoids a potential casting failure and simplifies the 
code.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala:
##########
@@ -331,6 +332,268 @@ case class RoundCeil(child: Expression, scale: Expression)
     copy(child = newLeft, scale = newRight)
 }
 
+case class TruncNumber(child: Expression, scale: Expression)
+  extends BaseBinaryExpression with NullIntolerant {
+
+  override protected def withNewChildrenInternal(
+      newLeft: Expression,
+      newRight: Expression): TruncNumber = copy(child = newLeft, scale = 
newRight)
+
+  /**
+   * Returns Java source code that can be compiled to evaluate this 
expression. The default
+   * behavior is to call the eval method of the expression. Concrete 
expression implementations
+   * should override this to do actual code generation.
+   *
+   * @param ctx
+   *   a [[CodegenContext]]
+   * @param ev
+   *   an [[ExprCode]] with unique terms.
+   * @return
+   *   an [[ExprCode]] containing the Java source code to generate the given 
expression
+   */
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode =
+    defineCodeGen(
+      ctx,
+      ev,
+      (input, _) => {
+        dataType match {
+          case ByteType if (_scale <= 0) =>
+            
s"""(byte)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case ShortType if (_scale <= 0) =>
+            
s"""(short)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case IntegerType if (_scale <= 0) =>
+            
s"""(int)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case LongType if (_scale <= 0) =>
+            s"""(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}))""".stripMargin
+          case FloatType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).floatValue()""".stripMargin
+          case DoubleType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).doubleValue()""".stripMargin
+          case DecimalType.Fixed(_, _) =>
+            s"""Decimal.apply(
+             |org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |${input}.toJavaBigDecimal(), ${_scale}))""".stripMargin
+          case _ => s"$input"
+        }
+      })
+
+  /**
+   * Returns the [[DataType]] of the result of evaluating this expression. It 
is invalid to query
+   * the dataType of an unresolved expression (i.e., when `resolved` == false).
+   */
+  override lazy val dataType: DataType = {
+    child.dataType match {
+      case DecimalType.Fixed(p, s) =>
+        val newPosition =
+          if (_scale > 0) {
+            if (_scale >= s) {
+              s
+            } else {
+              _scale
+            }
+          } else {
+            0
+          }
+        DecimalType(p - s + newPosition, newPosition)
+      case t => t
+    }
+  }
+
+  /**
+   * Called by default [[eval]] implementation. If subclass of 
BinaryExpression keep the default
+   * nullability, they can override this method to save null-check code. If we 
need full control
+   * of evaluation process, we should override [[eval]].
+   */
+  override protected def nullSafeEval(input1: Any, input2: Any): Any = {
+    dataType match {
+      case ByteType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Byte].toLong, _scale).toByte
+      case ShortType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Short].toLong, _scale).shortValue
+      case IntegerType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Int].toLong, _scale).intValue
+      case LongType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Long], _scale).longValue
+      case FloatType =>
+        TruncNumber.trunc(input1.asInstanceOf[Float], _scale).floatValue
+      case DoubleType =>
+        TruncNumber.trunc(input1.asInstanceOf[Double], _scale).doubleValue
+      case DecimalType.Fixed(p, s) =>
+        
Decimal(TruncNumber.trunc(input1.asInstanceOf[Decimal].toJavaBigDecimal, 
_scale))
+      case _ => input1
+    }
+  }
+}
+
+object TruncNumber {
+  /**
+   * To truncate whole numbers ; byte, short, int, long types
+   */
+  def trunc(input: Long, position: Int): Long = {
+    if (position >= 0) {
+      input
+    } else {
+      // position is -ve, truncate the number by absolute value of position
+      // eg: input 123 , scale -2 , result 100
+      val pow = Math.pow(10, Math.abs(position)).toLong
+      (input / pow) * pow
+    }
+  }
+
+  /**
+   * To truncate double and float type

Review Comment:
   please convert each comment to a full sentence with a trailing period?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala:
##########
@@ -331,6 +332,268 @@ case class RoundCeil(child: Expression, scale: Expression)
     copy(child = newLeft, scale = newRight)
 }
 
+case class TruncNumber(child: Expression, scale: Expression)
+  extends BaseBinaryExpression with NullIntolerant {
+
+  override protected def withNewChildrenInternal(
+      newLeft: Expression,
+      newRight: Expression): TruncNumber = copy(child = newLeft, scale = 
newRight)
+
+  /**
+   * Returns Java source code that can be compiled to evaluate this 
expression. The default
+   * behavior is to call the eval method of the expression. Concrete 
expression implementations
+   * should override this to do actual code generation.
+   *
+   * @param ctx
+   *   a [[CodegenContext]]
+   * @param ev
+   *   an [[ExprCode]] with unique terms.
+   * @return
+   *   an [[ExprCode]] containing the Java source code to generate the given 
expression
+   */
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode =
+    defineCodeGen(
+      ctx,
+      ev,
+      (input, _) => {
+        dataType match {
+          case ByteType if (_scale <= 0) =>
+            
s"""(byte)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case ShortType if (_scale <= 0) =>
+            
s"""(short)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case IntegerType if (_scale <= 0) =>
+            
s"""(int)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case LongType if (_scale <= 0) =>
+            s"""(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}))""".stripMargin
+          case FloatType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).floatValue()""".stripMargin
+          case DoubleType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).doubleValue()""".stripMargin
+          case DecimalType.Fixed(_, _) =>
+            s"""Decimal.apply(
+             |org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |${input}.toJavaBigDecimal(), ${_scale}))""".stripMargin
+          case _ => s"$input"
+        }
+      })
+
+  /**
+   * Returns the [[DataType]] of the result of evaluating this expression. It 
is invalid to query
+   * the dataType of an unresolved expression (i.e., when `resolved` == false).
+   */
+  override lazy val dataType: DataType = {
+    child.dataType match {
+      case DecimalType.Fixed(p, s) =>
+        val newPosition =
+          if (_scale > 0) {
+            if (_scale >= s) {
+              s
+            } else {
+              _scale
+            }
+          } else {
+            0
+          }
+        DecimalType(p - s + newPosition, newPosition)
+      case t => t
+    }
+  }
+
+  /**
+   * Called by default [[eval]] implementation. If subclass of 
BinaryExpression keep the default
+   * nullability, they can override this method to save null-check code. If we 
need full control
+   * of evaluation process, we should override [[eval]].
+   */
+  override protected def nullSafeEval(input1: Any, input2: Any): Any = {
+    dataType match {
+      case ByteType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Byte].toLong, _scale).toByte
+      case ShortType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Short].toLong, _scale).shortValue
+      case IntegerType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Int].toLong, _scale).intValue
+      case LongType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Long], _scale).longValue
+      case FloatType =>
+        TruncNumber.trunc(input1.asInstanceOf[Float], _scale).floatValue
+      case DoubleType =>
+        TruncNumber.trunc(input1.asInstanceOf[Double], _scale).doubleValue
+      case DecimalType.Fixed(p, s) =>
+        
Decimal(TruncNumber.trunc(input1.asInstanceOf[Decimal].toJavaBigDecimal, 
_scale))
+      case _ => input1
+    }
+  }
+}
+
+object TruncNumber {
+  /**
+   * To truncate whole numbers ; byte, short, int, long types
+   */
+  def trunc(input: Long, position: Int): Long = {
+    if (position >= 0) {
+      input
+    } else {
+      // position is -ve, truncate the number by absolute value of position
+      // eg: input 123 , scale -2 , result 100
+      val pow = Math.pow(10, Math.abs(position)).toLong
+      (input / pow) * pow
+    }
+  }
+
+  /**
+   * To truncate double and float type
+   */
+  def trunc(input: Double, position: Int): BigDecimal = {
+    trunc(jm.BigDecimal.valueOf(input), position)
+  }
+
+  /**
+   * To truncate decimal type
+   */
+  def trunc(input: jm.BigDecimal, position: Int): jm.BigDecimal = {
+    if (input.scale < position) {
+      input
+    } else {
+      val wholePart = input.toBigInteger
+      if (position > 0) {
+        // position is +ve , truncate only the decimal part by value of 
position

Review Comment:
   ```suggestion
           // Here we truncate only the decimal part by the value of the 
position.
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala:
##########
@@ -331,6 +332,268 @@ case class RoundCeil(child: Expression, scale: Expression)
     copy(child = newLeft, scale = newRight)
 }
 
+case class TruncNumber(child: Expression, scale: Expression)
+  extends BaseBinaryExpression with NullIntolerant {
+
+  override protected def withNewChildrenInternal(
+      newLeft: Expression,
+      newRight: Expression): TruncNumber = copy(child = newLeft, scale = 
newRight)
+
+  /**
+   * Returns Java source code that can be compiled to evaluate this 
expression. The default
+   * behavior is to call the eval method of the expression. Concrete 
expression implementations
+   * should override this to do actual code generation.
+   *
+   * @param ctx
+   *   a [[CodegenContext]]
+   * @param ev
+   *   an [[ExprCode]] with unique terms.
+   * @return
+   *   an [[ExprCode]] containing the Java source code to generate the given 
expression
+   */
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode =
+    defineCodeGen(
+      ctx,
+      ev,
+      (input, _) => {
+        dataType match {
+          case ByteType if (_scale <= 0) =>
+            
s"""(byte)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case ShortType if (_scale <= 0) =>
+            
s"""(short)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case IntegerType if (_scale <= 0) =>
+            
s"""(int)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case LongType if (_scale <= 0) =>
+            s"""(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}))""".stripMargin
+          case FloatType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).floatValue()""".stripMargin
+          case DoubleType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).doubleValue()""".stripMargin
+          case DecimalType.Fixed(_, _) =>
+            s"""Decimal.apply(
+             |org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |${input}.toJavaBigDecimal(), ${_scale}))""".stripMargin
+          case _ => s"$input"
+        }
+      })
+
+  /**
+   * Returns the [[DataType]] of the result of evaluating this expression. It 
is invalid to query
+   * the dataType of an unresolved expression (i.e., when `resolved` == false).
+   */
+  override lazy val dataType: DataType = {
+    child.dataType match {
+      case DecimalType.Fixed(p, s) =>
+        val newPosition =
+          if (_scale > 0) {
+            if (_scale >= s) {
+              s
+            } else {
+              _scale
+            }
+          } else {
+            0
+          }
+        DecimalType(p - s + newPosition, newPosition)
+      case t => t
+    }
+  }
+
+  /**
+   * Called by default [[eval]] implementation. If subclass of 
BinaryExpression keep the default
+   * nullability, they can override this method to save null-check code. If we 
need full control
+   * of evaluation process, we should override [[eval]].
+   */
+  override protected def nullSafeEval(input1: Any, input2: Any): Any = {
+    dataType match {
+      case ByteType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Byte].toLong, _scale).toByte
+      case ShortType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Short].toLong, _scale).shortValue
+      case IntegerType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Int].toLong, _scale).intValue
+      case LongType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Long], _scale).longValue
+      case FloatType =>
+        TruncNumber.trunc(input1.asInstanceOf[Float], _scale).floatValue
+      case DoubleType =>
+        TruncNumber.trunc(input1.asInstanceOf[Double], _scale).doubleValue
+      case DecimalType.Fixed(p, s) =>
+        
Decimal(TruncNumber.trunc(input1.asInstanceOf[Decimal].toJavaBigDecimal, 
_scale))
+      case _ => input1
+    }
+  }
+}
+
+object TruncNumber {
+  /**
+   * To truncate whole numbers ; byte, short, int, long types
+   */
+  def trunc(input: Long, position: Int): Long = {
+    if (position >= 0) {
+      input
+    } else {
+      // position is -ve, truncate the number by absolute value of position
+      // eg: input 123 , scale -2 , result 100
+      val pow = Math.pow(10, Math.abs(position)).toLong
+      (input / pow) * pow
+    }
+  }
+
+  /**
+   * To truncate double and float type
+   */
+  def trunc(input: Double, position: Int): BigDecimal = {
+    trunc(jm.BigDecimal.valueOf(input), position)
+  }
+
+  /**
+   * To truncate decimal type
+   */
+  def trunc(input: jm.BigDecimal, position: Int): jm.BigDecimal = {
+    if (input.scale < position) {
+      input
+    } else {
+      val wholePart = input.toBigInteger
+      if (position > 0) {
+        // position is +ve , truncate only the decimal part by value of 
position
+        val pow = jm.BigDecimal.valueOf(Math.pow(10, position).toLong)
+        val decimalPart = input.remainder(java.math.BigDecimal.ONE)
+        // To avoid overflow while multiplication, extract decimal part first,
+        // truncate it and then add it to whole part
+        // eg: input 123.456, scale 2, result 123.45
+        if (jm.BigDecimal.ZERO.compareTo(decimalPart) == 0) {
+          new jm.BigDecimal(wholePart)
+        } else {
+          val newRemainder = new 
jm.BigDecimal(decimalPart.multiply(pow).toBigInteger).divide(pow)
+          new jm.BigDecimal(wholePart).add(newRemainder)
+        }
+      } else if (position == 0) {
+        // position is 0, extract whole part

Review Comment:
   ```suggestion
           // The position is zero, so we extract the whole part.
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala:
##########
@@ -331,6 +332,268 @@ case class RoundCeil(child: Expression, scale: Expression)
     copy(child = newLeft, scale = newRight)
 }
 
+case class TruncNumber(child: Expression, scale: Expression)
+  extends BaseBinaryExpression with NullIntolerant {
+
+  override protected def withNewChildrenInternal(
+      newLeft: Expression,
+      newRight: Expression): TruncNumber = copy(child = newLeft, scale = 
newRight)
+
+  /**
+   * Returns Java source code that can be compiled to evaluate this 
expression. The default
+   * behavior is to call the eval method of the expression. Concrete 
expression implementations
+   * should override this to do actual code generation.
+   *
+   * @param ctx
+   *   a [[CodegenContext]]
+   * @param ev
+   *   an [[ExprCode]] with unique terms.
+   * @return
+   *   an [[ExprCode]] containing the Java source code to generate the given 
expression
+   */
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode =
+    defineCodeGen(
+      ctx,
+      ev,
+      (input, _) => {
+        dataType match {
+          case ByteType if (_scale <= 0) =>
+            
s"""(byte)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case ShortType if (_scale <= 0) =>
+            
s"""(short)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case IntegerType if (_scale <= 0) =>
+            
s"""(int)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case LongType if (_scale <= 0) =>
+            s"""(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}))""".stripMargin
+          case FloatType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).floatValue()""".stripMargin
+          case DoubleType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).doubleValue()""".stripMargin
+          case DecimalType.Fixed(_, _) =>
+            s"""Decimal.apply(
+             |org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |${input}.toJavaBigDecimal(), ${_scale}))""".stripMargin
+          case _ => s"$input"
+        }
+      })
+
+  /**
+   * Returns the [[DataType]] of the result of evaluating this expression. It 
is invalid to query
+   * the dataType of an unresolved expression (i.e., when `resolved` == false).
+   */
+  override lazy val dataType: DataType = {
+    child.dataType match {
+      case DecimalType.Fixed(p, s) =>
+        val newPosition =
+          if (_scale > 0) {
+            if (_scale >= s) {
+              s
+            } else {
+              _scale
+            }
+          } else {
+            0
+          }
+        DecimalType(p - s + newPosition, newPosition)
+      case t => t
+    }
+  }
+
+  /**
+   * Called by default [[eval]] implementation. If subclass of 
BinaryExpression keep the default
+   * nullability, they can override this method to save null-check code. If we 
need full control
+   * of evaluation process, we should override [[eval]].
+   */
+  override protected def nullSafeEval(input1: Any, input2: Any): Any = {
+    dataType match {
+      case ByteType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Byte].toLong, _scale).toByte
+      case ShortType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Short].toLong, _scale).shortValue
+      case IntegerType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Int].toLong, _scale).intValue
+      case LongType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Long], _scale).longValue
+      case FloatType =>
+        TruncNumber.trunc(input1.asInstanceOf[Float], _scale).floatValue
+      case DoubleType =>
+        TruncNumber.trunc(input1.asInstanceOf[Double], _scale).doubleValue
+      case DecimalType.Fixed(p, s) =>
+        
Decimal(TruncNumber.trunc(input1.asInstanceOf[Decimal].toJavaBigDecimal, 
_scale))
+      case _ => input1
+    }
+  }
+}
+
+object TruncNumber {
+  /**
+   * To truncate whole numbers ; byte, short, int, long types

Review Comment:
   ```suggestion
      * To truncate whole numbers; byte, short, int, and long types.
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala:
##########
@@ -331,6 +332,268 @@ case class RoundCeil(child: Expression, scale: Expression)
     copy(child = newLeft, scale = newRight)
 }
 
+case class TruncNumber(child: Expression, scale: Expression)
+  extends BaseBinaryExpression with NullIntolerant {
+
+  override protected def withNewChildrenInternal(
+      newLeft: Expression,
+      newRight: Expression): TruncNumber = copy(child = newLeft, scale = 
newRight)
+
+  /**
+   * Returns Java source code that can be compiled to evaluate this 
expression. The default
+   * behavior is to call the eval method of the expression. Concrete 
expression implementations
+   * should override this to do actual code generation.
+   *
+   * @param ctx
+   *   a [[CodegenContext]]
+   * @param ev
+   *   an [[ExprCode]] with unique terms.
+   * @return
+   *   an [[ExprCode]] containing the Java source code to generate the given 
expression
+   */
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode =
+    defineCodeGen(
+      ctx,
+      ev,
+      (input, _) => {
+        dataType match {
+          case ByteType if (_scale <= 0) =>
+            
s"""(byte)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case ShortType if (_scale <= 0) =>
+            
s"""(short)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case IntegerType if (_scale <= 0) =>
+            
s"""(int)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case LongType if (_scale <= 0) =>
+            s"""(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}))""".stripMargin
+          case FloatType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).floatValue()""".stripMargin
+          case DoubleType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).doubleValue()""".stripMargin
+          case DecimalType.Fixed(_, _) =>
+            s"""Decimal.apply(
+             |org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |${input}.toJavaBigDecimal(), ${_scale}))""".stripMargin
+          case _ => s"$input"
+        }
+      })
+
+  /**
+   * Returns the [[DataType]] of the result of evaluating this expression. It 
is invalid to query
+   * the dataType of an unresolved expression (i.e., when `resolved` == false).
+   */
+  override lazy val dataType: DataType = {
+    child.dataType match {
+      case DecimalType.Fixed(p, s) =>
+        val newPosition =
+          if (_scale > 0) {
+            if (_scale >= s) {
+              s
+            } else {
+              _scale
+            }
+          } else {
+            0
+          }
+        DecimalType(p - s + newPosition, newPosition)
+      case t => t
+    }
+  }
+
+  /**
+   * Called by default [[eval]] implementation. If subclass of 
BinaryExpression keep the default
+   * nullability, they can override this method to save null-check code. If we 
need full control
+   * of evaluation process, we should override [[eval]].
+   */
+  override protected def nullSafeEval(input1: Any, input2: Any): Any = {
+    dataType match {
+      case ByteType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Byte].toLong, _scale).toByte
+      case ShortType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Short].toLong, _scale).shortValue
+      case IntegerType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Int].toLong, _scale).intValue
+      case LongType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Long], _scale).longValue
+      case FloatType =>
+        TruncNumber.trunc(input1.asInstanceOf[Float], _scale).floatValue
+      case DoubleType =>
+        TruncNumber.trunc(input1.asInstanceOf[Double], _scale).doubleValue
+      case DecimalType.Fixed(p, s) =>
+        
Decimal(TruncNumber.trunc(input1.asInstanceOf[Decimal].toJavaBigDecimal, 
_scale))
+      case _ => input1
+    }
+  }
+}
+
+object TruncNumber {
+  /**
+   * To truncate whole numbers ; byte, short, int, long types
+   */
+  def trunc(input: Long, position: Int): Long = {
+    if (position >= 0) {
+      input
+    } else {
+      // position is -ve, truncate the number by absolute value of position
+      // eg: input 123 , scale -2 , result 100
+      val pow = Math.pow(10, Math.abs(position)).toLong
+      (input / pow) * pow
+    }
+  }
+
+  /**
+   * To truncate double and float type
+   */
+  def trunc(input: Double, position: Int): BigDecimal = {
+    trunc(jm.BigDecimal.valueOf(input), position)
+  }
+
+  /**
+   * To truncate decimal type
+   */
+  def trunc(input: jm.BigDecimal, position: Int): jm.BigDecimal = {
+    if (input.scale < position) {
+      input
+    } else {
+      val wholePart = input.toBigInteger
+      if (position > 0) {
+        // position is +ve , truncate only the decimal part by value of 
position
+        val pow = jm.BigDecimal.valueOf(Math.pow(10, position).toLong)
+        val decimalPart = input.remainder(java.math.BigDecimal.ONE)
+        // To avoid overflow while multiplication, extract decimal part first,
+        // truncate it and then add it to whole part
+        // eg: input 123.456, scale 2, result 123.45

Review Comment:
   ```suggestion
           // For example, if the input is 123.456 and the scale is 2, the 
result should be 123.45.
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala:
##########
@@ -331,6 +332,268 @@ case class RoundCeil(child: Expression, scale: Expression)
     copy(child = newLeft, scale = newRight)
 }
 
+case class TruncNumber(child: Expression, scale: Expression)
+  extends BaseBinaryExpression with NullIntolerant {
+
+  override protected def withNewChildrenInternal(
+      newLeft: Expression,
+      newRight: Expression): TruncNumber = copy(child = newLeft, scale = 
newRight)
+
+  /**
+   * Returns Java source code that can be compiled to evaluate this 
expression. The default
+   * behavior is to call the eval method of the expression. Concrete 
expression implementations
+   * should override this to do actual code generation.
+   *
+   * @param ctx
+   *   a [[CodegenContext]]
+   * @param ev
+   *   an [[ExprCode]] with unique terms.
+   * @return
+   *   an [[ExprCode]] containing the Java source code to generate the given 
expression
+   */
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode =
+    defineCodeGen(
+      ctx,
+      ev,
+      (input, _) => {
+        dataType match {
+          case ByteType if (_scale <= 0) =>
+            
s"""(byte)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case ShortType if (_scale <= 0) =>
+            
s"""(short)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case IntegerType if (_scale <= 0) =>
+            
s"""(int)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case LongType if (_scale <= 0) =>
+            s"""(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}))""".stripMargin
+          case FloatType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).floatValue()""".stripMargin
+          case DoubleType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).doubleValue()""".stripMargin
+          case DecimalType.Fixed(_, _) =>
+            s"""Decimal.apply(
+             |org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |${input}.toJavaBigDecimal(), ${_scale}))""".stripMargin
+          case _ => s"$input"
+        }
+      })
+
+  /**
+   * Returns the [[DataType]] of the result of evaluating this expression. It 
is invalid to query
+   * the dataType of an unresolved expression (i.e., when `resolved` == false).
+   */
+  override lazy val dataType: DataType = {
+    child.dataType match {
+      case DecimalType.Fixed(p, s) =>
+        val newPosition =
+          if (_scale > 0) {
+            if (_scale >= s) {
+              s
+            } else {
+              _scale
+            }
+          } else {
+            0
+          }
+        DecimalType(p - s + newPosition, newPosition)
+      case t => t
+    }
+  }
+
+  /**
+   * Called by default [[eval]] implementation. If subclass of 
BinaryExpression keep the default
+   * nullability, they can override this method to save null-check code. If we 
need full control
+   * of evaluation process, we should override [[eval]].
+   */
+  override protected def nullSafeEval(input1: Any, input2: Any): Any = {
+    dataType match {
+      case ByteType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Byte].toLong, _scale).toByte
+      case ShortType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Short].toLong, _scale).shortValue
+      case IntegerType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Int].toLong, _scale).intValue
+      case LongType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Long], _scale).longValue
+      case FloatType =>
+        TruncNumber.trunc(input1.asInstanceOf[Float], _scale).floatValue
+      case DoubleType =>
+        TruncNumber.trunc(input1.asInstanceOf[Double], _scale).doubleValue
+      case DecimalType.Fixed(p, s) =>
+        
Decimal(TruncNumber.trunc(input1.asInstanceOf[Decimal].toJavaBigDecimal, 
_scale))
+      case _ => input1
+    }
+  }
+}
+
+object TruncNumber {
+  /**
+   * To truncate whole numbers ; byte, short, int, long types
+   */
+  def trunc(input: Long, position: Int): Long = {
+    if (position >= 0) {
+      input
+    } else {
+      // position is -ve, truncate the number by absolute value of position
+      // eg: input 123 , scale -2 , result 100

Review Comment:
   ```suggestion
         // For example, if the input is 123 and the scale is -2, then the 
result is 100.
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala:
##########
@@ -331,6 +332,268 @@ case class RoundCeil(child: Expression, scale: Expression)
     copy(child = newLeft, scale = newRight)
 }
 
+case class TruncNumber(child: Expression, scale: Expression)
+  extends BaseBinaryExpression with NullIntolerant {
+
+  override protected def withNewChildrenInternal(
+      newLeft: Expression,
+      newRight: Expression): TruncNumber = copy(child = newLeft, scale = 
newRight)
+
+  /**
+   * Returns Java source code that can be compiled to evaluate this 
expression. The default
+   * behavior is to call the eval method of the expression. Concrete 
expression implementations
+   * should override this to do actual code generation.
+   *
+   * @param ctx
+   *   a [[CodegenContext]]
+   * @param ev
+   *   an [[ExprCode]] with unique terms.
+   * @return
+   *   an [[ExprCode]] containing the Java source code to generate the given 
expression
+   */
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode =
+    defineCodeGen(
+      ctx,
+      ev,
+      (input, _) => {
+        dataType match {
+          case ByteType if (_scale <= 0) =>
+            
s"""(byte)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case ShortType if (_scale <= 0) =>
+            
s"""(short)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case IntegerType if (_scale <= 0) =>
+            
s"""(int)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case LongType if (_scale <= 0) =>
+            s"""(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}))""".stripMargin
+          case FloatType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).floatValue()""".stripMargin
+          case DoubleType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).doubleValue()""".stripMargin
+          case DecimalType.Fixed(_, _) =>
+            s"""Decimal.apply(
+             |org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |${input}.toJavaBigDecimal(), ${_scale}))""".stripMargin
+          case _ => s"$input"
+        }
+      })
+
+  /**
+   * Returns the [[DataType]] of the result of evaluating this expression. It 
is invalid to query
+   * the dataType of an unresolved expression (i.e., when `resolved` == false).
+   */
+  override lazy val dataType: DataType = {
+    child.dataType match {
+      case DecimalType.Fixed(p, s) =>
+        val newPosition =
+          if (_scale > 0) {
+            if (_scale >= s) {
+              s
+            } else {
+              _scale
+            }
+          } else {
+            0
+          }
+        DecimalType(p - s + newPosition, newPosition)
+      case t => t
+    }
+  }
+
+  /**
+   * Called by default [[eval]] implementation. If subclass of 
BinaryExpression keep the default
+   * nullability, they can override this method to save null-check code. If we 
need full control
+   * of evaluation process, we should override [[eval]].
+   */
+  override protected def nullSafeEval(input1: Any, input2: Any): Any = {
+    dataType match {
+      case ByteType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Byte].toLong, _scale).toByte
+      case ShortType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Short].toLong, _scale).shortValue
+      case IntegerType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Int].toLong, _scale).intValue
+      case LongType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Long], _scale).longValue
+      case FloatType =>
+        TruncNumber.trunc(input1.asInstanceOf[Float], _scale).floatValue
+      case DoubleType =>
+        TruncNumber.trunc(input1.asInstanceOf[Double], _scale).doubleValue
+      case DecimalType.Fixed(p, s) =>
+        
Decimal(TruncNumber.trunc(input1.asInstanceOf[Decimal].toJavaBigDecimal, 
_scale))
+      case _ => input1
+    }
+  }
+}
+
+object TruncNumber {
+  /**
+   * To truncate whole numbers ; byte, short, int, long types
+   */
+  def trunc(input: Long, position: Int): Long = {
+    if (position >= 0) {
+      input
+    } else {
+      // position is -ve, truncate the number by absolute value of position

Review Comment:
   ```suggestion
         // Here we truncate the number by the absolute value of the position.
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala:
##########
@@ -331,6 +332,268 @@ case class RoundCeil(child: Expression, scale: Expression)
     copy(child = newLeft, scale = newRight)
 }
 
+case class TruncNumber(child: Expression, scale: Expression)
+  extends BaseBinaryExpression with NullIntolerant {
+
+  override protected def withNewChildrenInternal(
+      newLeft: Expression,
+      newRight: Expression): TruncNumber = copy(child = newLeft, scale = 
newRight)
+
+  /**
+   * Returns Java source code that can be compiled to evaluate this 
expression. The default
+   * behavior is to call the eval method of the expression. Concrete 
expression implementations
+   * should override this to do actual code generation.
+   *
+   * @param ctx
+   *   a [[CodegenContext]]
+   * @param ev
+   *   an [[ExprCode]] with unique terms.
+   * @return
+   *   an [[ExprCode]] containing the Java source code to generate the given 
expression
+   */
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode =
+    defineCodeGen(
+      ctx,
+      ev,
+      (input, _) => {
+        dataType match {
+          case ByteType if (_scale <= 0) =>
+            
s"""(byte)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case ShortType if (_scale <= 0) =>
+            
s"""(short)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case IntegerType if (_scale <= 0) =>
+            
s"""(int)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case LongType if (_scale <= 0) =>
+            s"""(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}))""".stripMargin
+          case FloatType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).floatValue()""".stripMargin
+          case DoubleType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).doubleValue()""".stripMargin
+          case DecimalType.Fixed(_, _) =>
+            s"""Decimal.apply(
+             |org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |${input}.toJavaBigDecimal(), ${_scale}))""".stripMargin
+          case _ => s"$input"
+        }
+      })
+
+  /**
+   * Returns the [[DataType]] of the result of evaluating this expression. It 
is invalid to query
+   * the dataType of an unresolved expression (i.e., when `resolved` == false).
+   */
+  override lazy val dataType: DataType = {
+    child.dataType match {
+      case DecimalType.Fixed(p, s) =>
+        val newPosition =
+          if (_scale > 0) {
+            if (_scale >= s) {
+              s
+            } else {
+              _scale
+            }
+          } else {
+            0
+          }
+        DecimalType(p - s + newPosition, newPosition)
+      case t => t
+    }
+  }
+
+  /**
+   * Called by default [[eval]] implementation. If subclass of 
BinaryExpression keep the default
+   * nullability, they can override this method to save null-check code. If we 
need full control
+   * of evaluation process, we should override [[eval]].
+   */
+  override protected def nullSafeEval(input1: Any, input2: Any): Any = {
+    dataType match {
+      case ByteType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Byte].toLong, _scale).toByte
+      case ShortType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Short].toLong, _scale).shortValue
+      case IntegerType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Int].toLong, _scale).intValue
+      case LongType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Long], _scale).longValue
+      case FloatType =>
+        TruncNumber.trunc(input1.asInstanceOf[Float], _scale).floatValue
+      case DoubleType =>
+        TruncNumber.trunc(input1.asInstanceOf[Double], _scale).doubleValue
+      case DecimalType.Fixed(p, s) =>
+        
Decimal(TruncNumber.trunc(input1.asInstanceOf[Decimal].toJavaBigDecimal, 
_scale))
+      case _ => input1
+    }
+  }
+}
+
+object TruncNumber {
+  /**
+   * To truncate whole numbers ; byte, short, int, long types
+   */
+  def trunc(input: Long, position: Int): Long = {
+    if (position >= 0) {
+      input
+    } else {
+      // position is -ve, truncate the number by absolute value of position
+      // eg: input 123 , scale -2 , result 100
+      val pow = Math.pow(10, Math.abs(position)).toLong
+      (input / pow) * pow
+    }
+  }
+
+  /**
+   * To truncate double and float type
+   */
+  def trunc(input: Double, position: Int): BigDecimal = {
+    trunc(jm.BigDecimal.valueOf(input), position)
+  }
+
+  /**
+   * To truncate decimal type
+   */
+  def trunc(input: jm.BigDecimal, position: Int): jm.BigDecimal = {
+    if (input.scale < position) {
+      input
+    } else {
+      val wholePart = input.toBigInteger
+      if (position > 0) {
+        // position is +ve , truncate only the decimal part by value of 
position
+        val pow = jm.BigDecimal.valueOf(Math.pow(10, position).toLong)
+        val decimalPart = input.remainder(java.math.BigDecimal.ONE)
+        // To avoid overflow while multiplication, extract decimal part first,
+        // truncate it and then add it to whole part
+        // eg: input 123.456, scale 2, result 123.45
+        if (jm.BigDecimal.ZERO.compareTo(decimalPart) == 0) {
+          new jm.BigDecimal(wholePart)
+        } else {
+          val newRemainder = new 
jm.BigDecimal(decimalPart.multiply(pow).toBigInteger).divide(pow)
+          new jm.BigDecimal(wholePart).add(newRemainder)
+        }
+      } else if (position == 0) {
+        // position is 0, extract whole part
+        // eg: input 123.456, scale 0, result 123

Review Comment:
   ```suggestion
           // For example, if the input is 123.456 and the scale is 0, the 
result is 123.
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala:
##########
@@ -331,6 +332,268 @@ case class RoundCeil(child: Expression, scale: Expression)
     copy(child = newLeft, scale = newRight)
 }
 
+case class TruncNumber(child: Expression, scale: Expression)
+  extends BaseBinaryExpression with NullIntolerant {
+
+  override protected def withNewChildrenInternal(
+      newLeft: Expression,
+      newRight: Expression): TruncNumber = copy(child = newLeft, scale = 
newRight)
+
+  /**
+   * Returns Java source code that can be compiled to evaluate this 
expression. The default
+   * behavior is to call the eval method of the expression. Concrete 
expression implementations
+   * should override this to do actual code generation.
+   *
+   * @param ctx
+   *   a [[CodegenContext]]
+   * @param ev
+   *   an [[ExprCode]] with unique terms.
+   * @return
+   *   an [[ExprCode]] containing the Java source code to generate the given 
expression
+   */
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode =
+    defineCodeGen(
+      ctx,
+      ev,
+      (input, _) => {
+        dataType match {
+          case ByteType if (_scale <= 0) =>
+            
s"""(byte)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case ShortType if (_scale <= 0) =>
+            
s"""(short)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case IntegerType if (_scale <= 0) =>
+            
s"""(int)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case LongType if (_scale <= 0) =>
+            s"""(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}))""".stripMargin
+          case FloatType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).floatValue()""".stripMargin
+          case DoubleType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).doubleValue()""".stripMargin
+          case DecimalType.Fixed(_, _) =>
+            s"""Decimal.apply(
+             |org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |${input}.toJavaBigDecimal(), ${_scale}))""".stripMargin
+          case _ => s"$input"
+        }
+      })
+
+  /**
+   * Returns the [[DataType]] of the result of evaluating this expression. It 
is invalid to query
+   * the dataType of an unresolved expression (i.e., when `resolved` == false).
+   */
+  override lazy val dataType: DataType = {
+    child.dataType match {
+      case DecimalType.Fixed(p, s) =>
+        val newPosition =
+          if (_scale > 0) {
+            if (_scale >= s) {
+              s
+            } else {
+              _scale
+            }
+          } else {
+            0
+          }
+        DecimalType(p - s + newPosition, newPosition)
+      case t => t
+    }
+  }
+
+  /**
+   * Called by default [[eval]] implementation. If subclass of 
BinaryExpression keep the default
+   * nullability, they can override this method to save null-check code. If we 
need full control
+   * of evaluation process, we should override [[eval]].
+   */
+  override protected def nullSafeEval(input1: Any, input2: Any): Any = {
+    dataType match {
+      case ByteType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Byte].toLong, _scale).toByte
+      case ShortType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Short].toLong, _scale).shortValue
+      case IntegerType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Int].toLong, _scale).intValue
+      case LongType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Long], _scale).longValue
+      case FloatType =>
+        TruncNumber.trunc(input1.asInstanceOf[Float], _scale).floatValue
+      case DoubleType =>
+        TruncNumber.trunc(input1.asInstanceOf[Double], _scale).doubleValue
+      case DecimalType.Fixed(p, s) =>
+        
Decimal(TruncNumber.trunc(input1.asInstanceOf[Decimal].toJavaBigDecimal, 
_scale))
+      case _ => input1
+    }
+  }
+}
+
+object TruncNumber {
+  /**
+   * To truncate whole numbers ; byte, short, int, long types
+   */
+  def trunc(input: Long, position: Int): Long = {
+    if (position >= 0) {
+      input
+    } else {
+      // position is -ve, truncate the number by absolute value of position
+      // eg: input 123 , scale -2 , result 100
+      val pow = Math.pow(10, Math.abs(position)).toLong
+      (input / pow) * pow
+    }
+  }
+
+  /**
+   * To truncate double and float type
+   */
+  def trunc(input: Double, position: Int): BigDecimal = {
+    trunc(jm.BigDecimal.valueOf(input), position)
+  }
+
+  /**
+   * To truncate decimal type
+   */
+  def trunc(input: jm.BigDecimal, position: Int): jm.BigDecimal = {
+    if (input.scale < position) {
+      input
+    } else {
+      val wholePart = input.toBigInteger
+      if (position > 0) {
+        // position is +ve , truncate only the decimal part by value of 
position
+        val pow = jm.BigDecimal.valueOf(Math.pow(10, position).toLong)
+        val decimalPart = input.remainder(java.math.BigDecimal.ONE)
+        // To avoid overflow while multiplication, extract decimal part first,

Review Comment:
   ```suggestion
           // To avoid overflow during multiplication, we extract the decimal 
part first,
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala:
##########
@@ -331,6 +332,268 @@ case class RoundCeil(child: Expression, scale: Expression)
     copy(child = newLeft, scale = newRight)
 }
 
+case class TruncNumber(child: Expression, scale: Expression)
+  extends BaseBinaryExpression with NullIntolerant {
+
+  override protected def withNewChildrenInternal(
+      newLeft: Expression,
+      newRight: Expression): TruncNumber = copy(child = newLeft, scale = 
newRight)
+
+  /**
+   * Returns Java source code that can be compiled to evaluate this 
expression. The default
+   * behavior is to call the eval method of the expression. Concrete 
expression implementations
+   * should override this to do actual code generation.
+   *
+   * @param ctx
+   *   a [[CodegenContext]]
+   * @param ev
+   *   an [[ExprCode]] with unique terms.
+   * @return
+   *   an [[ExprCode]] containing the Java source code to generate the given 
expression
+   */
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode =
+    defineCodeGen(
+      ctx,
+      ev,
+      (input, _) => {
+        dataType match {
+          case ByteType if (_scale <= 0) =>
+            
s"""(byte)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case ShortType if (_scale <= 0) =>
+            
s"""(short)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case IntegerType if (_scale <= 0) =>
+            
s"""(int)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case LongType if (_scale <= 0) =>
+            s"""(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}))""".stripMargin
+          case FloatType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).floatValue()""".stripMargin
+          case DoubleType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).doubleValue()""".stripMargin
+          case DecimalType.Fixed(_, _) =>
+            s"""Decimal.apply(
+             |org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |${input}.toJavaBigDecimal(), ${_scale}))""".stripMargin
+          case _ => s"$input"
+        }
+      })
+
+  /**
+   * Returns the [[DataType]] of the result of evaluating this expression. It 
is invalid to query
+   * the dataType of an unresolved expression (i.e., when `resolved` == false).
+   */
+  override lazy val dataType: DataType = {
+    child.dataType match {
+      case DecimalType.Fixed(p, s) =>
+        val newPosition =
+          if (_scale > 0) {
+            if (_scale >= s) {
+              s
+            } else {
+              _scale
+            }
+          } else {
+            0
+          }
+        DecimalType(p - s + newPosition, newPosition)
+      case t => t
+    }
+  }
+
+  /**
+   * Called by default [[eval]] implementation. If subclass of 
BinaryExpression keep the default
+   * nullability, they can override this method to save null-check code. If we 
need full control
+   * of evaluation process, we should override [[eval]].
+   */
+  override protected def nullSafeEval(input1: Any, input2: Any): Any = {
+    dataType match {
+      case ByteType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Byte].toLong, _scale).toByte
+      case ShortType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Short].toLong, _scale).shortValue
+      case IntegerType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Int].toLong, _scale).intValue
+      case LongType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Long], _scale).longValue
+      case FloatType =>
+        TruncNumber.trunc(input1.asInstanceOf[Float], _scale).floatValue
+      case DoubleType =>
+        TruncNumber.trunc(input1.asInstanceOf[Double], _scale).doubleValue
+      case DecimalType.Fixed(p, s) =>
+        
Decimal(TruncNumber.trunc(input1.asInstanceOf[Decimal].toJavaBigDecimal, 
_scale))
+      case _ => input1
+    }
+  }
+}
+
+object TruncNumber {
+  /**
+   * To truncate whole numbers ; byte, short, int, long types
+   */
+  def trunc(input: Long, position: Int): Long = {
+    if (position >= 0) {
+      input
+    } else {
+      // position is -ve, truncate the number by absolute value of position
+      // eg: input 123 , scale -2 , result 100
+      val pow = Math.pow(10, Math.abs(position)).toLong
+      (input / pow) * pow
+    }
+  }
+
+  /**
+   * To truncate double and float type
+   */
+  def trunc(input: Double, position: Int): BigDecimal = {
+    trunc(jm.BigDecimal.valueOf(input), position)
+  }
+
+  /**
+   * To truncate decimal type
+   */
+  def trunc(input: jm.BigDecimal, position: Int): jm.BigDecimal = {
+    if (input.scale < position) {
+      input
+    } else {
+      val wholePart = input.toBigInteger
+      if (position > 0) {
+        // position is +ve , truncate only the decimal part by value of 
position
+        val pow = jm.BigDecimal.valueOf(Math.pow(10, position).toLong)
+        val decimalPart = input.remainder(java.math.BigDecimal.ONE)
+        // To avoid overflow while multiplication, extract decimal part first,
+        // truncate it and then add it to whole part

Review Comment:
   ```suggestion
           // truncate it and then add the whole part.
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala:
##########
@@ -331,6 +332,268 @@ case class RoundCeil(child: Expression, scale: Expression)
     copy(child = newLeft, scale = newRight)
 }
 
+case class TruncNumber(child: Expression, scale: Expression)
+  extends BaseBinaryExpression with NullIntolerant {
+
+  override protected def withNewChildrenInternal(
+      newLeft: Expression,
+      newRight: Expression): TruncNumber = copy(child = newLeft, scale = 
newRight)
+
+  /**
+   * Returns Java source code that can be compiled to evaluate this 
expression. The default
+   * behavior is to call the eval method of the expression. Concrete 
expression implementations
+   * should override this to do actual code generation.
+   *
+   * @param ctx
+   *   a [[CodegenContext]]
+   * @param ev
+   *   an [[ExprCode]] with unique terms.
+   * @return
+   *   an [[ExprCode]] containing the Java source code to generate the given 
expression
+   */
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode =
+    defineCodeGen(
+      ctx,
+      ev,
+      (input, _) => {
+        dataType match {
+          case ByteType if (_scale <= 0) =>
+            
s"""(byte)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case ShortType if (_scale <= 0) =>
+            
s"""(short)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case IntegerType if (_scale <= 0) =>
+            
s"""(int)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case LongType if (_scale <= 0) =>
+            s"""(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}))""".stripMargin
+          case FloatType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).floatValue()""".stripMargin
+          case DoubleType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).doubleValue()""".stripMargin
+          case DecimalType.Fixed(_, _) =>
+            s"""Decimal.apply(
+             |org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |${input}.toJavaBigDecimal(), ${_scale}))""".stripMargin
+          case _ => s"$input"
+        }
+      })
+
+  /**
+   * Returns the [[DataType]] of the result of evaluating this expression. It 
is invalid to query
+   * the dataType of an unresolved expression (i.e., when `resolved` == false).
+   */
+  override lazy val dataType: DataType = {
+    child.dataType match {
+      case DecimalType.Fixed(p, s) =>
+        val newPosition =
+          if (_scale > 0) {
+            if (_scale >= s) {
+              s
+            } else {
+              _scale
+            }
+          } else {
+            0
+          }
+        DecimalType(p - s + newPosition, newPosition)
+      case t => t
+    }
+  }
+
+  /**
+   * Called by default [[eval]] implementation. If subclass of 
BinaryExpression keep the default
+   * nullability, they can override this method to save null-check code. If we 
need full control
+   * of evaluation process, we should override [[eval]].
+   */
+  override protected def nullSafeEval(input1: Any, input2: Any): Any = {
+    dataType match {
+      case ByteType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Byte].toLong, _scale).toByte
+      case ShortType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Short].toLong, _scale).shortValue
+      case IntegerType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Int].toLong, _scale).intValue
+      case LongType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Long], _scale).longValue
+      case FloatType =>
+        TruncNumber.trunc(input1.asInstanceOf[Float], _scale).floatValue
+      case DoubleType =>
+        TruncNumber.trunc(input1.asInstanceOf[Double], _scale).doubleValue
+      case DecimalType.Fixed(p, s) =>
+        
Decimal(TruncNumber.trunc(input1.asInstanceOf[Decimal].toJavaBigDecimal, 
_scale))
+      case _ => input1
+    }
+  }
+}
+
+object TruncNumber {
+  /**
+   * To truncate whole numbers ; byte, short, int, long types
+   */
+  def trunc(input: Long, position: Int): Long = {
+    if (position >= 0) {
+      input
+    } else {
+      // position is -ve, truncate the number by absolute value of position
+      // eg: input 123 , scale -2 , result 100
+      val pow = Math.pow(10, Math.abs(position)).toLong
+      (input / pow) * pow
+    }
+  }
+
+  /**
+   * To truncate double and float type
+   */
+  def trunc(input: Double, position: Int): BigDecimal = {
+    trunc(jm.BigDecimal.valueOf(input), position)
+  }
+
+  /**
+   * To truncate decimal type
+   */
+  def trunc(input: jm.BigDecimal, position: Int): jm.BigDecimal = {
+    if (input.scale < position) {
+      input
+    } else {
+      val wholePart = input.toBigInteger
+      if (position > 0) {
+        // position is +ve , truncate only the decimal part by value of 
position
+        val pow = jm.BigDecimal.valueOf(Math.pow(10, position).toLong)
+        val decimalPart = input.remainder(java.math.BigDecimal.ONE)
+        // To avoid overflow while multiplication, extract decimal part first,
+        // truncate it and then add it to whole part
+        // eg: input 123.456, scale 2, result 123.45
+        if (jm.BigDecimal.ZERO.compareTo(decimalPart) == 0) {
+          new jm.BigDecimal(wholePart)
+        } else {
+          val newRemainder = new 
jm.BigDecimal(decimalPart.multiply(pow).toBigInteger).divide(pow)
+          new jm.BigDecimal(wholePart).add(newRemainder)
+        }
+      } else if (position == 0) {
+        // position is 0, extract whole part
+        // eg: input 123.456, scale 0, result 123
+        new jm.BigDecimal(wholePart)
+      } else {
+        // position is -ve, truncate the whole part by absolute value of 
position
+        // eg: input 123.456, scale -2, result 100

Review Comment:
   ```suggestion
           // For example, if the input is 123.456 and the scale is -2, the 
result is 100.
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala:
##########
@@ -331,6 +332,268 @@ case class RoundCeil(child: Expression, scale: Expression)
     copy(child = newLeft, scale = newRight)
 }
 
+case class TruncNumber(child: Expression, scale: Expression)
+  extends BaseBinaryExpression with NullIntolerant {
+
+  override protected def withNewChildrenInternal(
+      newLeft: Expression,
+      newRight: Expression): TruncNumber = copy(child = newLeft, scale = 
newRight)
+
+  /**
+   * Returns Java source code that can be compiled to evaluate this 
expression. The default
+   * behavior is to call the eval method of the expression. Concrete 
expression implementations
+   * should override this to do actual code generation.
+   *
+   * @param ctx
+   *   a [[CodegenContext]]
+   * @param ev
+   *   an [[ExprCode]] with unique terms.
+   * @return
+   *   an [[ExprCode]] containing the Java source code to generate the given 
expression
+   */
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode =
+    defineCodeGen(
+      ctx,
+      ev,
+      (input, _) => {
+        dataType match {
+          case ByteType if (_scale <= 0) =>
+            
s"""(byte)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case ShortType if (_scale <= 0) =>
+            
s"""(short)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case IntegerType if (_scale <= 0) =>
+            
s"""(int)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case LongType if (_scale <= 0) =>
+            s"""(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}))""".stripMargin
+          case FloatType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).floatValue()""".stripMargin
+          case DoubleType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).doubleValue()""".stripMargin
+          case DecimalType.Fixed(_, _) =>
+            s"""Decimal.apply(
+             |org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |${input}.toJavaBigDecimal(), ${_scale}))""".stripMargin
+          case _ => s"$input"
+        }
+      })
+
+  /**
+   * Returns the [[DataType]] of the result of evaluating this expression. It 
is invalid to query
+   * the dataType of an unresolved expression (i.e., when `resolved` == false).
+   */
+  override lazy val dataType: DataType = {
+    child.dataType match {
+      case DecimalType.Fixed(p, s) =>
+        val newPosition =
+          if (_scale > 0) {
+            if (_scale >= s) {
+              s
+            } else {
+              _scale
+            }
+          } else {
+            0
+          }
+        DecimalType(p - s + newPosition, newPosition)
+      case t => t
+    }
+  }
+
+  /**
+   * Called by default [[eval]] implementation. If subclass of 
BinaryExpression keep the default
+   * nullability, they can override this method to save null-check code. If we 
need full control
+   * of evaluation process, we should override [[eval]].
+   */
+  override protected def nullSafeEval(input1: Any, input2: Any): Any = {
+    dataType match {
+      case ByteType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Byte].toLong, _scale).toByte
+      case ShortType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Short].toLong, _scale).shortValue
+      case IntegerType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Int].toLong, _scale).intValue
+      case LongType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Long], _scale).longValue
+      case FloatType =>
+        TruncNumber.trunc(input1.asInstanceOf[Float], _scale).floatValue
+      case DoubleType =>
+        TruncNumber.trunc(input1.asInstanceOf[Double], _scale).doubleValue
+      case DecimalType.Fixed(p, s) =>
+        
Decimal(TruncNumber.trunc(input1.asInstanceOf[Decimal].toJavaBigDecimal, 
_scale))
+      case _ => input1
+    }
+  }
+}
+
+object TruncNumber {
+  /**
+   * To truncate whole numbers ; byte, short, int, long types
+   */
+  def trunc(input: Long, position: Int): Long = {
+    if (position >= 0) {
+      input
+    } else {
+      // position is -ve, truncate the number by absolute value of position
+      // eg: input 123 , scale -2 , result 100
+      val pow = Math.pow(10, Math.abs(position)).toLong
+      (input / pow) * pow
+    }
+  }
+
+  /**
+   * To truncate double and float type
+   */
+  def trunc(input: Double, position: Int): BigDecimal = {
+    trunc(jm.BigDecimal.valueOf(input), position)
+  }
+
+  /**
+   * To truncate decimal type
+   */
+  def trunc(input: jm.BigDecimal, position: Int): jm.BigDecimal = {
+    if (input.scale < position) {
+      input
+    } else {
+      val wholePart = input.toBigInteger
+      if (position > 0) {
+        // position is +ve , truncate only the decimal part by value of 
position
+        val pow = jm.BigDecimal.valueOf(Math.pow(10, position).toLong)
+        val decimalPart = input.remainder(java.math.BigDecimal.ONE)
+        // To avoid overflow while multiplication, extract decimal part first,
+        // truncate it and then add it to whole part
+        // eg: input 123.456, scale 2, result 123.45
+        if (jm.BigDecimal.ZERO.compareTo(decimalPart) == 0) {
+          new jm.BigDecimal(wholePart)
+        } else {
+          val newRemainder = new 
jm.BigDecimal(decimalPart.multiply(pow).toBigInteger).divide(pow)
+          new jm.BigDecimal(wholePart).add(newRemainder)
+        }
+      } else if (position == 0) {
+        // position is 0, extract whole part
+        // eg: input 123.456, scale 0, result 123
+        new jm.BigDecimal(wholePart)
+      } else {
+        // position is -ve, truncate the whole part by absolute value of 
position
+        // eg: input 123.456, scale -2, result 100
+        if (jm.BigInteger.ZERO.compareTo(wholePart) == 0) {
+          new jm.BigDecimal(wholePart)
+        } else {
+          val pow = jm.BigInteger.valueOf(Math.pow(10, 
Math.abs(position)).toLong)
+          new jm.BigDecimal(wholePart.divide(pow).multiply(pow), 0)
+        }
+      }
+    }
+  }
+}
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = """_FUNC_(number[, position]) - Returns the number after truncated 
to the specified places.
+    An optional `position` parameter can be specified to truncate digits to 
the right of the decimal point.
+    If 0, it removes all the decimal values and returns only the integer.
+    If negative, the number is truncated to the left side of the decimal point.
+    There is an overloaded version of this function to truncate date values

Review Comment:
   ```suggestion
       Note that there is an overloaded version of this function to truncate 
date values.
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala:
##########
@@ -1432,6 +1695,53 @@ case class Logarithm(left: Expression, right: Expression)
     newLeft: Expression, newRight: Expression): Logarithm = copy(left = 
newLeft, right = newRight)
 }
 
+trait BaseBinaryExpression extends BinaryExpression
+  with ExpectsInputTypes
+  with Serializable
+  with ImplicitCastInputTypes {
+  val child: Expression
+  val scale: Expression
+  override def left: Expression = child
+  override def right: Expression = scale
+  override def nullable: Boolean = true
+  override def foldable: Boolean = child.foldable
+
+  /**
+   * Expected input types from child expressions. The i-th position in the 
returned seq indicates
+   * the type requirement for the i-th child.
+   *
+   * The possible values at each position are:
+   *   1. a specific data type, e.g. LongType, StringType.
+   *   2. a non-leaf abstract data type,
+   *      e.g.NumericType, IntegralType, FractionalType.

Review Comment:
   ```suggestion
      *      such as NumericType, IntegralType, FractionalType.
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala:
##########
@@ -331,6 +332,268 @@ case class RoundCeil(child: Expression, scale: Expression)
     copy(child = newLeft, scale = newRight)
 }
 
+case class TruncNumber(child: Expression, scale: Expression)
+  extends BaseBinaryExpression with NullIntolerant {
+
+  override protected def withNewChildrenInternal(
+      newLeft: Expression,
+      newRight: Expression): TruncNumber = copy(child = newLeft, scale = 
newRight)
+
+  /**
+   * Returns Java source code that can be compiled to evaluate this 
expression. The default
+   * behavior is to call the eval method of the expression. Concrete 
expression implementations
+   * should override this to do actual code generation.
+   *
+   * @param ctx
+   *   a [[CodegenContext]]
+   * @param ev
+   *   an [[ExprCode]] with unique terms.
+   * @return
+   *   an [[ExprCode]] containing the Java source code to generate the given 
expression
+   */
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode =
+    defineCodeGen(
+      ctx,
+      ev,
+      (input, _) => {
+        dataType match {
+          case ByteType if (_scale <= 0) =>
+            
s"""(byte)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case ShortType if (_scale <= 0) =>
+            
s"""(short)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case IntegerType if (_scale <= 0) =>
+            
s"""(int)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case LongType if (_scale <= 0) =>
+            s"""(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}))""".stripMargin
+          case FloatType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).floatValue()""".stripMargin
+          case DoubleType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).doubleValue()""".stripMargin
+          case DecimalType.Fixed(_, _) =>
+            s"""Decimal.apply(
+             |org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |${input}.toJavaBigDecimal(), ${_scale}))""".stripMargin
+          case _ => s"$input"
+        }
+      })
+
+  /**
+   * Returns the [[DataType]] of the result of evaluating this expression. It 
is invalid to query
+   * the dataType of an unresolved expression (i.e., when `resolved` == false).
+   */
+  override lazy val dataType: DataType = {
+    child.dataType match {
+      case DecimalType.Fixed(p, s) =>
+        val newPosition =
+          if (_scale > 0) {
+            if (_scale >= s) {
+              s
+            } else {
+              _scale
+            }
+          } else {
+            0
+          }
+        DecimalType(p - s + newPosition, newPosition)
+      case t => t
+    }
+  }
+
+  /**
+   * Called by default [[eval]] implementation. If subclass of 
BinaryExpression keep the default
+   * nullability, they can override this method to save null-check code. If we 
need full control
+   * of evaluation process, we should override [[eval]].
+   */
+  override protected def nullSafeEval(input1: Any, input2: Any): Any = {
+    dataType match {
+      case ByteType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Byte].toLong, _scale).toByte
+      case ShortType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Short].toLong, _scale).shortValue
+      case IntegerType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Int].toLong, _scale).intValue
+      case LongType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Long], _scale).longValue
+      case FloatType =>
+        TruncNumber.trunc(input1.asInstanceOf[Float], _scale).floatValue
+      case DoubleType =>
+        TruncNumber.trunc(input1.asInstanceOf[Double], _scale).doubleValue
+      case DecimalType.Fixed(p, s) =>
+        
Decimal(TruncNumber.trunc(input1.asInstanceOf[Decimal].toJavaBigDecimal, 
_scale))
+      case _ => input1
+    }
+  }
+}
+
+object TruncNumber {
+  /**
+   * To truncate whole numbers ; byte, short, int, long types
+   */
+  def trunc(input: Long, position: Int): Long = {
+    if (position >= 0) {
+      input
+    } else {
+      // position is -ve, truncate the number by absolute value of position
+      // eg: input 123 , scale -2 , result 100
+      val pow = Math.pow(10, Math.abs(position)).toLong
+      (input / pow) * pow
+    }
+  }
+
+  /**
+   * To truncate double and float type
+   */
+  def trunc(input: Double, position: Int): BigDecimal = {
+    trunc(jm.BigDecimal.valueOf(input), position)
+  }
+
+  /**
+   * To truncate decimal type
+   */
+  def trunc(input: jm.BigDecimal, position: Int): jm.BigDecimal = {
+    if (input.scale < position) {
+      input
+    } else {
+      val wholePart = input.toBigInteger
+      if (position > 0) {
+        // position is +ve , truncate only the decimal part by value of 
position
+        val pow = jm.BigDecimal.valueOf(Math.pow(10, position).toLong)
+        val decimalPart = input.remainder(java.math.BigDecimal.ONE)
+        // To avoid overflow while multiplication, extract decimal part first,
+        // truncate it and then add it to whole part
+        // eg: input 123.456, scale 2, result 123.45
+        if (jm.BigDecimal.ZERO.compareTo(decimalPart) == 0) {
+          new jm.BigDecimal(wholePart)
+        } else {
+          val newRemainder = new 
jm.BigDecimal(decimalPart.multiply(pow).toBigInteger).divide(pow)
+          new jm.BigDecimal(wholePart).add(newRemainder)
+        }
+      } else if (position == 0) {
+        // position is 0, extract whole part
+        // eg: input 123.456, scale 0, result 123
+        new jm.BigDecimal(wholePart)
+      } else {
+        // position is -ve, truncate the whole part by absolute value of 
position
+        // eg: input 123.456, scale -2, result 100
+        if (jm.BigInteger.ZERO.compareTo(wholePart) == 0) {
+          new jm.BigDecimal(wholePart)
+        } else {
+          val pow = jm.BigInteger.valueOf(Math.pow(10, 
Math.abs(position)).toLong)
+          new jm.BigDecimal(wholePart.divide(pow).multiply(pow), 0)
+        }
+      }
+    }
+  }
+}
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = """_FUNC_(number[, position]) - Returns the number after truncated 
to the specified places.
+    An optional `position` parameter can be specified to truncate digits to 
the right of the decimal point.
+    If 0, it removes all the decimal values and returns only the integer.
+    If negative, the number is truncated to the left side of the decimal point.
+    There is an overloaded version of this function to truncate date values
+    _FUNC_(date, fmt) - Returns `date` with the time portion of the day 
truncated to the unit specified by the format model `fmt`.
+  """,
+  arguments = """
+    Arguments:
+      * number - number to be truncated
+      * position - number of decimal places up to which the given number is to 
be truncated
+    Arguments: To truncate date value:
+      * date - date value or valid date string
+      * fmt - the format representing the unit to be truncated to
+          - "YEAR", "YYYY", "YY" - truncate to the first date of the year that 
the `date` falls in
+          - "QUARTER" - truncate to the first date of the quarter that the 
`date` falls in
+          - "MONTH", "MM", "MON" - truncate to the first date of the month 
that the `date` falls in
+          - "WEEK" - truncate to the Monday of the week that the `date` falls 
in
+  """,
+  examples = """
+    Examples:
+      > SELECT _FUNC_('2019-08-04', 'week');
+       2019-07-29
+      > SELECT _FUNC_('2019-08-04', 'quarter');
+       2019-07-01
+      > SELECT _FUNC_('2009-02-12', 'MM');
+       2009-02-01
+      > SELECT _FUNC_('2015-10-27', 'YEAR');
+       2015-01-01
+      > SELECT _FUNC_(-10.11, 0);
+       -10
+      > SELECT _FUNC_(10.11, -1);
+       10
+      > SELECT _FUNC_(100.61, 0);
+       100
+      > SELECT _FUNC_(-19087.1560, -3);
+       -19000
+      > SELECT _FUNC_(10876.5489, -1);
+       10870
+      > SELECT _FUNC_(-7767.1160, 2);
+       -7767.11
+      > SELECT _FUNC_(17646.6019, 3);
+       17646.601
+  """,
+  since = "3.4.0",
+  group = "math_funcs")
+// scalastyle:on line.size.limit
+object TruncExpressionBuilder extends ExpressionBuilder {
+  override def build(funcName: String, expressions: Seq[Expression]): 
Expression = {
+    val numArgs = expressions.length
+    if (numArgs >= 1) {
+      expressions(0).dataType match {
+        case ByteType |  ShortType | IntegerType | LongType | FloatType | 
DoubleType
+             | DecimalType.Fixed(_, _) => buildTruncNumber(funcName, 
expressions)
+        case _ => buildTruncDate(funcName, expressions)
+      }
+    } else {
+      throw QueryCompilationErrors.invalidFunctionArgumentNumberError(Seq(2), 
funcName, numArgs)
+    }
+  }
+
+  private def buildTruncDate(funcName: String, expressions: Seq[Expression]) = 
{
+    val numArgs = expressions.length
+    if (numArgs == 2) {
+      TruncDate(expressions(0), expressions(1))
+    } else {
+      throw QueryCompilationErrors.invalidFunctionArgumentNumberError(Seq(2), 
funcName, numArgs)

Review Comment:
   same, you can reverse the `if` condition and then de-dent the remainder of 
the logic



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala:
##########
@@ -331,6 +332,268 @@ case class RoundCeil(child: Expression, scale: Expression)
     copy(child = newLeft, scale = newRight)
 }
 
+case class TruncNumber(child: Expression, scale: Expression)
+  extends BaseBinaryExpression with NullIntolerant {
+
+  override protected def withNewChildrenInternal(
+      newLeft: Expression,
+      newRight: Expression): TruncNumber = copy(child = newLeft, scale = 
newRight)
+
+  /**
+   * Returns Java source code that can be compiled to evaluate this 
expression. The default
+   * behavior is to call the eval method of the expression. Concrete 
expression implementations
+   * should override this to do actual code generation.
+   *
+   * @param ctx
+   *   a [[CodegenContext]]
+   * @param ev
+   *   an [[ExprCode]] with unique terms.
+   * @return
+   *   an [[ExprCode]] containing the Java source code to generate the given 
expression
+   */
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode =
+    defineCodeGen(
+      ctx,
+      ev,
+      (input, _) => {
+        dataType match {
+          case ByteType if (_scale <= 0) =>
+            
s"""(byte)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case ShortType if (_scale <= 0) =>
+            
s"""(short)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case IntegerType if (_scale <= 0) =>
+            
s"""(int)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case LongType if (_scale <= 0) =>
+            s"""(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}))""".stripMargin
+          case FloatType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).floatValue()""".stripMargin
+          case DoubleType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).doubleValue()""".stripMargin
+          case DecimalType.Fixed(_, _) =>
+            s"""Decimal.apply(
+             |org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |${input}.toJavaBigDecimal(), ${_scale}))""".stripMargin
+          case _ => s"$input"
+        }
+      })
+
+  /**
+   * Returns the [[DataType]] of the result of evaluating this expression. It 
is invalid to query
+   * the dataType of an unresolved expression (i.e., when `resolved` == false).
+   */
+  override lazy val dataType: DataType = {
+    child.dataType match {
+      case DecimalType.Fixed(p, s) =>
+        val newPosition =
+          if (_scale > 0) {
+            if (_scale >= s) {
+              s
+            } else {
+              _scale
+            }
+          } else {
+            0
+          }
+        DecimalType(p - s + newPosition, newPosition)
+      case t => t
+    }
+  }
+
+  /**
+   * Called by default [[eval]] implementation. If subclass of 
BinaryExpression keep the default
+   * nullability, they can override this method to save null-check code. If we 
need full control
+   * of evaluation process, we should override [[eval]].
+   */
+  override protected def nullSafeEval(input1: Any, input2: Any): Any = {
+    dataType match {
+      case ByteType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Byte].toLong, _scale).toByte
+      case ShortType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Short].toLong, _scale).shortValue
+      case IntegerType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Int].toLong, _scale).intValue
+      case LongType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Long], _scale).longValue
+      case FloatType =>
+        TruncNumber.trunc(input1.asInstanceOf[Float], _scale).floatValue
+      case DoubleType =>
+        TruncNumber.trunc(input1.asInstanceOf[Double], _scale).doubleValue
+      case DecimalType.Fixed(p, s) =>
+        
Decimal(TruncNumber.trunc(input1.asInstanceOf[Decimal].toJavaBigDecimal, 
_scale))
+      case _ => input1
+    }
+  }
+}
+
+object TruncNumber {
+  /**
+   * To truncate whole numbers ; byte, short, int, long types
+   */
+  def trunc(input: Long, position: Int): Long = {
+    if (position >= 0) {
+      input
+    } else {
+      // position is -ve, truncate the number by absolute value of position
+      // eg: input 123 , scale -2 , result 100
+      val pow = Math.pow(10, Math.abs(position)).toLong
+      (input / pow) * pow
+    }
+  }
+
+  /**
+   * To truncate double and float type
+   */
+  def trunc(input: Double, position: Int): BigDecimal = {
+    trunc(jm.BigDecimal.valueOf(input), position)
+  }
+
+  /**
+   * To truncate decimal type
+   */
+  def trunc(input: jm.BigDecimal, position: Int): jm.BigDecimal = {
+    if (input.scale < position) {
+      input
+    } else {
+      val wholePart = input.toBigInteger
+      if (position > 0) {
+        // position is +ve , truncate only the decimal part by value of 
position
+        val pow = jm.BigDecimal.valueOf(Math.pow(10, position).toLong)
+        val decimalPart = input.remainder(java.math.BigDecimal.ONE)
+        // To avoid overflow while multiplication, extract decimal part first,
+        // truncate it and then add it to whole part
+        // eg: input 123.456, scale 2, result 123.45
+        if (jm.BigDecimal.ZERO.compareTo(decimalPart) == 0) {
+          new jm.BigDecimal(wholePart)
+        } else {
+          val newRemainder = new 
jm.BigDecimal(decimalPart.multiply(pow).toBigInteger).divide(pow)
+          new jm.BigDecimal(wholePart).add(newRemainder)
+        }
+      } else if (position == 0) {
+        // position is 0, extract whole part
+        // eg: input 123.456, scale 0, result 123
+        new jm.BigDecimal(wholePart)
+      } else {
+        // position is -ve, truncate the whole part by absolute value of 
position

Review Comment:
   ```suggestion
           // Here we truncate the whole part by the absolute value of the 
position.
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala:
##########
@@ -331,6 +332,268 @@ case class RoundCeil(child: Expression, scale: Expression)
     copy(child = newLeft, scale = newRight)
 }
 
+case class TruncNumber(child: Expression, scale: Expression)
+  extends BaseBinaryExpression with NullIntolerant {
+
+  override protected def withNewChildrenInternal(
+      newLeft: Expression,
+      newRight: Expression): TruncNumber = copy(child = newLeft, scale = 
newRight)
+
+  /**
+   * Returns Java source code that can be compiled to evaluate this 
expression. The default
+   * behavior is to call the eval method of the expression. Concrete 
expression implementations
+   * should override this to do actual code generation.
+   *
+   * @param ctx
+   *   a [[CodegenContext]]
+   * @param ev
+   *   an [[ExprCode]] with unique terms.
+   * @return
+   *   an [[ExprCode]] containing the Java source code to generate the given 
expression
+   */
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode =
+    defineCodeGen(
+      ctx,
+      ev,
+      (input, _) => {
+        dataType match {
+          case ByteType if (_scale <= 0) =>
+            
s"""(byte)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case ShortType if (_scale <= 0) =>
+            
s"""(short)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case IntegerType if (_scale <= 0) =>
+            
s"""(int)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case LongType if (_scale <= 0) =>
+            s"""(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}))""".stripMargin
+          case FloatType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).floatValue()""".stripMargin
+          case DoubleType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).doubleValue()""".stripMargin
+          case DecimalType.Fixed(_, _) =>
+            s"""Decimal.apply(
+             |org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |${input}.toJavaBigDecimal(), ${_scale}))""".stripMargin
+          case _ => s"$input"
+        }
+      })
+
+  /**
+   * Returns the [[DataType]] of the result of evaluating this expression. It 
is invalid to query
+   * the dataType of an unresolved expression (i.e., when `resolved` == false).
+   */
+  override lazy val dataType: DataType = {
+    child.dataType match {
+      case DecimalType.Fixed(p, s) =>
+        val newPosition =
+          if (_scale > 0) {
+            if (_scale >= s) {
+              s
+            } else {
+              _scale
+            }
+          } else {
+            0
+          }
+        DecimalType(p - s + newPosition, newPosition)
+      case t => t
+    }
+  }
+
+  /**
+   * Called by default [[eval]] implementation. If subclass of 
BinaryExpression keep the default
+   * nullability, they can override this method to save null-check code. If we 
need full control
+   * of evaluation process, we should override [[eval]].
+   */
+  override protected def nullSafeEval(input1: Any, input2: Any): Any = {
+    dataType match {
+      case ByteType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Byte].toLong, _scale).toByte
+      case ShortType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Short].toLong, _scale).shortValue
+      case IntegerType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Int].toLong, _scale).intValue
+      case LongType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Long], _scale).longValue
+      case FloatType =>
+        TruncNumber.trunc(input1.asInstanceOf[Float], _scale).floatValue
+      case DoubleType =>
+        TruncNumber.trunc(input1.asInstanceOf[Double], _scale).doubleValue
+      case DecimalType.Fixed(p, s) =>
+        
Decimal(TruncNumber.trunc(input1.asInstanceOf[Decimal].toJavaBigDecimal, 
_scale))
+      case _ => input1
+    }
+  }
+}
+
+object TruncNumber {
+  /**
+   * To truncate whole numbers ; byte, short, int, long types
+   */
+  def trunc(input: Long, position: Int): Long = {
+    if (position >= 0) {
+      input
+    } else {
+      // position is -ve, truncate the number by absolute value of position
+      // eg: input 123 , scale -2 , result 100
+      val pow = Math.pow(10, Math.abs(position)).toLong
+      (input / pow) * pow
+    }
+  }
+
+  /**
+   * To truncate double and float type
+   */
+  def trunc(input: Double, position: Int): BigDecimal = {
+    trunc(jm.BigDecimal.valueOf(input), position)
+  }
+
+  /**
+   * To truncate decimal type
+   */
+  def trunc(input: jm.BigDecimal, position: Int): jm.BigDecimal = {
+    if (input.scale < position) {
+      input
+    } else {
+      val wholePart = input.toBigInteger
+      if (position > 0) {
+        // position is +ve , truncate only the decimal part by value of 
position
+        val pow = jm.BigDecimal.valueOf(Math.pow(10, position).toLong)
+        val decimalPart = input.remainder(java.math.BigDecimal.ONE)
+        // To avoid overflow while multiplication, extract decimal part first,
+        // truncate it and then add it to whole part
+        // eg: input 123.456, scale 2, result 123.45
+        if (jm.BigDecimal.ZERO.compareTo(decimalPart) == 0) {
+          new jm.BigDecimal(wholePart)
+        } else {
+          val newRemainder = new 
jm.BigDecimal(decimalPart.multiply(pow).toBigInteger).divide(pow)
+          new jm.BigDecimal(wholePart).add(newRemainder)
+        }
+      } else if (position == 0) {
+        // position is 0, extract whole part
+        // eg: input 123.456, scale 0, result 123
+        new jm.BigDecimal(wholePart)
+      } else {
+        // position is -ve, truncate the whole part by absolute value of 
position
+        // eg: input 123.456, scale -2, result 100
+        if (jm.BigInteger.ZERO.compareTo(wholePart) == 0) {
+          new jm.BigDecimal(wholePart)
+        } else {
+          val pow = jm.BigInteger.valueOf(Math.pow(10, 
Math.abs(position)).toLong)
+          new jm.BigDecimal(wholePart.divide(pow).multiply(pow), 0)
+        }
+      }
+    }
+  }
+}
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = """_FUNC_(number[, position]) - Returns the number after truncated 
to the specified places.
+    An optional `position` parameter can be specified to truncate digits to 
the right of the decimal point.
+    If 0, it removes all the decimal values and returns only the integer.
+    If negative, the number is truncated to the left side of the decimal point.
+    There is an overloaded version of this function to truncate date values
+    _FUNC_(date, fmt) - Returns `date` with the time portion of the day 
truncated to the unit specified by the format model `fmt`.
+  """,
+  arguments = """
+    Arguments:
+      * number - number to be truncated
+      * position - number of decimal places up to which the given number is to 
be truncated
+    Arguments: To truncate date value:
+      * date - date value or valid date string
+      * fmt - the format representing the unit to be truncated to
+          - "YEAR", "YYYY", "YY" - truncate to the first date of the year that 
the `date` falls in
+          - "QUARTER" - truncate to the first date of the quarter that the 
`date` falls in
+          - "MONTH", "MM", "MON" - truncate to the first date of the month 
that the `date` falls in
+          - "WEEK" - truncate to the Monday of the week that the `date` falls 
in
+  """,
+  examples = """
+    Examples:
+      > SELECT _FUNC_('2019-08-04', 'week');
+       2019-07-29
+      > SELECT _FUNC_('2019-08-04', 'quarter');
+       2019-07-01
+      > SELECT _FUNC_('2009-02-12', 'MM');
+       2009-02-01
+      > SELECT _FUNC_('2015-10-27', 'YEAR');
+       2015-01-01
+      > SELECT _FUNC_(-10.11, 0);
+       -10
+      > SELECT _FUNC_(10.11, -1);
+       10
+      > SELECT _FUNC_(100.61, 0);
+       100
+      > SELECT _FUNC_(-19087.1560, -3);
+       -19000
+      > SELECT _FUNC_(10876.5489, -1);
+       10870
+      > SELECT _FUNC_(-7767.1160, 2);
+       -7767.11
+      > SELECT _FUNC_(17646.6019, 3);
+       17646.601
+  """,
+  since = "3.4.0",
+  group = "math_funcs")
+// scalastyle:on line.size.limit
+object TruncExpressionBuilder extends ExpressionBuilder {
+  override def build(funcName: String, expressions: Seq[Expression]): 
Expression = {
+    val numArgs = expressions.length
+    if (numArgs >= 1) {
+      expressions(0).dataType match {
+        case ByteType |  ShortType | IntegerType | LongType | FloatType | 
DoubleType
+             | DecimalType.Fixed(_, _) => buildTruncNumber(funcName, 
expressions)
+        case _ => buildTruncDate(funcName, expressions)
+      }
+    } else {
+      throw QueryCompilationErrors.invalidFunctionArgumentNumberError(Seq(2), 
funcName, numArgs)
+    }
+  }
+
+  private def buildTruncDate(funcName: String, expressions: Seq[Expression]) = 
{
+    val numArgs = expressions.length
+    if (numArgs == 2) {
+      TruncDate(expressions(0), expressions(1))
+    } else {
+      throw QueryCompilationErrors.invalidFunctionArgumentNumberError(Seq(2), 
funcName, numArgs)
+    }
+  }
+
+  private def buildTruncNumber(funcName: String, expressions: Seq[Expression]) 
= {
+    val numArgs = expressions.length
+    if (numArgs >= 1) {

Review Comment:
   same here, `if (numArgs < 1) { throw ... }`



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala:
##########
@@ -1432,6 +1695,53 @@ case class Logarithm(left: Expression, right: Expression)
     newLeft: Expression, newRight: Expression): Logarithm = copy(left = 
newLeft, right = newRight)
 }
 
+trait BaseBinaryExpression extends BinaryExpression
+  with ExpectsInputTypes
+  with Serializable
+  with ImplicitCastInputTypes {
+  val child: Expression
+  val scale: Expression
+  override def left: Expression = child
+  override def right: Expression = scale
+  override def nullable: Boolean = true
+  override def foldable: Boolean = child.foldable
+
+  /**
+   * Expected input types from child expressions. The i-th position in the 
returned seq indicates
+   * the type requirement for the i-th child.
+   *
+   * The possible values at each position are:
+   *   1. a specific data type, e.g. LongType, StringType.

Review Comment:
   ```suggestion
      *   1. a specific data type, such as LongType or StringType.
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala:
##########
@@ -331,6 +332,268 @@ case class RoundCeil(child: Expression, scale: Expression)
     copy(child = newLeft, scale = newRight)
 }
 
+case class TruncNumber(child: Expression, scale: Expression)
+  extends BaseBinaryExpression with NullIntolerant {
+
+  override protected def withNewChildrenInternal(
+      newLeft: Expression,
+      newRight: Expression): TruncNumber = copy(child = newLeft, scale = 
newRight)
+
+  /**
+   * Returns Java source code that can be compiled to evaluate this 
expression. The default
+   * behavior is to call the eval method of the expression. Concrete 
expression implementations
+   * should override this to do actual code generation.
+   *
+   * @param ctx
+   *   a [[CodegenContext]]
+   * @param ev
+   *   an [[ExprCode]] with unique terms.
+   * @return
+   *   an [[ExprCode]] containing the Java source code to generate the given 
expression
+   */
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode =
+    defineCodeGen(
+      ctx,
+      ev,
+      (input, _) => {
+        dataType match {
+          case ByteType if (_scale <= 0) =>
+            
s"""(byte)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case ShortType if (_scale <= 0) =>
+            
s"""(short)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case IntegerType if (_scale <= 0) =>
+            
s"""(int)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case LongType if (_scale <= 0) =>
+            s"""(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}))""".stripMargin
+          case FloatType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).floatValue()""".stripMargin
+          case DoubleType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).doubleValue()""".stripMargin
+          case DecimalType.Fixed(_, _) =>
+            s"""Decimal.apply(
+             |org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |${input}.toJavaBigDecimal(), ${_scale}))""".stripMargin
+          case _ => s"$input"
+        }
+      })
+
+  /**
+   * Returns the [[DataType]] of the result of evaluating this expression. It 
is invalid to query
+   * the dataType of an unresolved expression (i.e., when `resolved` == false).
+   */
+  override lazy val dataType: DataType = {
+    child.dataType match {
+      case DecimalType.Fixed(p, s) =>
+        val newPosition =
+          if (_scale > 0) {
+            if (_scale >= s) {
+              s
+            } else {
+              _scale
+            }
+          } else {
+            0
+          }
+        DecimalType(p - s + newPosition, newPosition)
+      case t => t
+    }
+  }
+
+  /**
+   * Called by default [[eval]] implementation. If subclass of 
BinaryExpression keep the default
+   * nullability, they can override this method to save null-check code. If we 
need full control
+   * of evaluation process, we should override [[eval]].
+   */
+  override protected def nullSafeEval(input1: Any, input2: Any): Any = {
+    dataType match {
+      case ByteType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Byte].toLong, _scale).toByte
+      case ShortType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Short].toLong, _scale).shortValue
+      case IntegerType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Int].toLong, _scale).intValue
+      case LongType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Long], _scale).longValue
+      case FloatType =>
+        TruncNumber.trunc(input1.asInstanceOf[Float], _scale).floatValue
+      case DoubleType =>
+        TruncNumber.trunc(input1.asInstanceOf[Double], _scale).doubleValue
+      case DecimalType.Fixed(p, s) =>
+        
Decimal(TruncNumber.trunc(input1.asInstanceOf[Decimal].toJavaBigDecimal, 
_scale))
+      case _ => input1
+    }
+  }
+}
+
+object TruncNumber {
+  /**
+   * To truncate whole numbers ; byte, short, int, long types
+   */
+  def trunc(input: Long, position: Int): Long = {
+    if (position >= 0) {
+      input
+    } else {
+      // position is -ve, truncate the number by absolute value of position
+      // eg: input 123 , scale -2 , result 100
+      val pow = Math.pow(10, Math.abs(position)).toLong
+      (input / pow) * pow
+    }
+  }
+
+  /**
+   * To truncate double and float type
+   */
+  def trunc(input: Double, position: Int): BigDecimal = {
+    trunc(jm.BigDecimal.valueOf(input), position)
+  }
+
+  /**
+   * To truncate decimal type
+   */
+  def trunc(input: jm.BigDecimal, position: Int): jm.BigDecimal = {
+    if (input.scale < position) {
+      input
+    } else {
+      val wholePart = input.toBigInteger
+      if (position > 0) {
+        // position is +ve , truncate only the decimal part by value of 
position
+        val pow = jm.BigDecimal.valueOf(Math.pow(10, position).toLong)
+        val decimalPart = input.remainder(java.math.BigDecimal.ONE)
+        // To avoid overflow while multiplication, extract decimal part first,
+        // truncate it and then add it to whole part
+        // eg: input 123.456, scale 2, result 123.45
+        if (jm.BigDecimal.ZERO.compareTo(decimalPart) == 0) {
+          new jm.BigDecimal(wholePart)
+        } else {
+          val newRemainder = new 
jm.BigDecimal(decimalPart.multiply(pow).toBigInteger).divide(pow)
+          new jm.BigDecimal(wholePart).add(newRemainder)
+        }
+      } else if (position == 0) {
+        // position is 0, extract whole part
+        // eg: input 123.456, scale 0, result 123
+        new jm.BigDecimal(wholePart)
+      } else {
+        // position is -ve, truncate the whole part by absolute value of 
position
+        // eg: input 123.456, scale -2, result 100
+        if (jm.BigInteger.ZERO.compareTo(wholePart) == 0) {
+          new jm.BigDecimal(wholePart)
+        } else {
+          val pow = jm.BigInteger.valueOf(Math.pow(10, 
Math.abs(position)).toLong)
+          new jm.BigDecimal(wholePart.divide(pow).multiply(pow), 0)
+        }
+      }
+    }
+  }
+}
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = """_FUNC_(number[, position]) - Returns the number after truncated 
to the specified places.
+    An optional `position` parameter can be specified to truncate digits to 
the right of the decimal point.
+    If 0, it removes all the decimal values and returns only the integer.
+    If negative, the number is truncated to the left side of the decimal point.
+    There is an overloaded version of this function to truncate date values
+    _FUNC_(date, fmt) - Returns `date` with the time portion of the day 
truncated to the unit specified by the format model `fmt`.
+  """,
+  arguments = """
+    Arguments:
+      * number - number to be truncated
+      * position - number of decimal places up to which the given number is to 
be truncated
+    Arguments: To truncate date value:
+      * date - date value or valid date string
+      * fmt - the format representing the unit to be truncated to
+          - "YEAR", "YYYY", "YY" - truncate to the first date of the year that 
the `date` falls in
+          - "QUARTER" - truncate to the first date of the quarter that the 
`date` falls in
+          - "MONTH", "MM", "MON" - truncate to the first date of the month 
that the `date` falls in
+          - "WEEK" - truncate to the Monday of the week that the `date` falls 
in
+  """,
+  examples = """
+    Examples:
+      > SELECT _FUNC_('2019-08-04', 'week');
+       2019-07-29
+      > SELECT _FUNC_('2019-08-04', 'quarter');
+       2019-07-01
+      > SELECT _FUNC_('2009-02-12', 'MM');
+       2009-02-01
+      > SELECT _FUNC_('2015-10-27', 'YEAR');
+       2015-01-01
+      > SELECT _FUNC_(-10.11, 0);
+       -10
+      > SELECT _FUNC_(10.11, -1);
+       10
+      > SELECT _FUNC_(100.61, 0);
+       100
+      > SELECT _FUNC_(-19087.1560, -3);
+       -19000
+      > SELECT _FUNC_(10876.5489, -1);
+       10870
+      > SELECT _FUNC_(-7767.1160, 2);
+       -7767.11
+      > SELECT _FUNC_(17646.6019, 3);
+       17646.601
+  """,
+  since = "3.4.0",
+  group = "math_funcs")
+// scalastyle:on line.size.limit
+object TruncExpressionBuilder extends ExpressionBuilder {
+  override def build(funcName: String, expressions: Seq[Expression]): 
Expression = {
+    val numArgs = expressions.length
+    if (numArgs >= 1) {
+      expressions(0).dataType match {
+        case ByteType |  ShortType | IntegerType | LongType | FloatType | 
DoubleType
+             | DecimalType.Fixed(_, _) => buildTruncNumber(funcName, 
expressions)
+        case _ => buildTruncDate(funcName, expressions)
+      }
+    } else {
+      throw QueryCompilationErrors.invalidFunctionArgumentNumberError(Seq(2), 
funcName, numArgs)

Review Comment:
   you can check `if (numArgs < 1)` and throw the error first, de-denting the 
rest of the block to simplify the code.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala:
##########
@@ -331,6 +332,268 @@ case class RoundCeil(child: Expression, scale: Expression)
     copy(child = newLeft, scale = newRight)
 }
 
+case class TruncNumber(child: Expression, scale: Expression)
+  extends BaseBinaryExpression with NullIntolerant {
+
+  override protected def withNewChildrenInternal(
+      newLeft: Expression,
+      newRight: Expression): TruncNumber = copy(child = newLeft, scale = 
newRight)
+
+  /**
+   * Returns Java source code that can be compiled to evaluate this 
expression. The default
+   * behavior is to call the eval method of the expression. Concrete 
expression implementations
+   * should override this to do actual code generation.
+   *
+   * @param ctx
+   *   a [[CodegenContext]]
+   * @param ev
+   *   an [[ExprCode]] with unique terms.
+   * @return
+   *   an [[ExprCode]] containing the Java source code to generate the given 
expression
+   */
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode =
+    defineCodeGen(
+      ctx,
+      ev,
+      (input, _) => {
+        dataType match {
+          case ByteType if (_scale <= 0) =>
+            
s"""(byte)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case ShortType if (_scale <= 0) =>
+            
s"""(short)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case IntegerType if (_scale <= 0) =>
+            
s"""(int)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case LongType if (_scale <= 0) =>
+            s"""(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}))""".stripMargin
+          case FloatType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).floatValue()""".stripMargin
+          case DoubleType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).doubleValue()""".stripMargin
+          case DecimalType.Fixed(_, _) =>
+            s"""Decimal.apply(
+             |org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |${input}.toJavaBigDecimal(), ${_scale}))""".stripMargin
+          case _ => s"$input"
+        }
+      })
+
+  /**
+   * Returns the [[DataType]] of the result of evaluating this expression. It 
is invalid to query
+   * the dataType of an unresolved expression (i.e., when `resolved` == false).
+   */
+  override lazy val dataType: DataType = {
+    child.dataType match {
+      case DecimalType.Fixed(p, s) =>
+        val newPosition =
+          if (_scale > 0) {
+            if (_scale >= s) {
+              s
+            } else {
+              _scale
+            }
+          } else {
+            0
+          }
+        DecimalType(p - s + newPosition, newPosition)
+      case t => t
+    }
+  }
+
+  /**
+   * Called by default [[eval]] implementation. If subclass of 
BinaryExpression keep the default
+   * nullability, they can override this method to save null-check code. If we 
need full control
+   * of evaluation process, we should override [[eval]].
+   */
+  override protected def nullSafeEval(input1: Any, input2: Any): Any = {
+    dataType match {
+      case ByteType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Byte].toLong, _scale).toByte
+      case ShortType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Short].toLong, _scale).shortValue
+      case IntegerType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Int].toLong, _scale).intValue
+      case LongType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Long], _scale).longValue
+      case FloatType =>
+        TruncNumber.trunc(input1.asInstanceOf[Float], _scale).floatValue
+      case DoubleType =>
+        TruncNumber.trunc(input1.asInstanceOf[Double], _scale).doubleValue
+      case DecimalType.Fixed(p, s) =>
+        
Decimal(TruncNumber.trunc(input1.asInstanceOf[Decimal].toJavaBigDecimal, 
_scale))
+      case _ => input1
+    }
+  }
+}
+
+object TruncNumber {
+  /**
+   * To truncate whole numbers ; byte, short, int, long types
+   */
+  def trunc(input: Long, position: Int): Long = {
+    if (position >= 0) {
+      input
+    } else {
+      // position is -ve, truncate the number by absolute value of position
+      // eg: input 123 , scale -2 , result 100
+      val pow = Math.pow(10, Math.abs(position)).toLong
+      (input / pow) * pow
+    }
+  }
+
+  /**
+   * To truncate double and float type
+   */
+  def trunc(input: Double, position: Int): BigDecimal = {
+    trunc(jm.BigDecimal.valueOf(input), position)
+  }
+
+  /**
+   * To truncate decimal type
+   */
+  def trunc(input: jm.BigDecimal, position: Int): jm.BigDecimal = {
+    if (input.scale < position) {
+      input
+    } else {
+      val wholePart = input.toBigInteger
+      if (position > 0) {
+        // position is +ve , truncate only the decimal part by value of 
position
+        val pow = jm.BigDecimal.valueOf(Math.pow(10, position).toLong)
+        val decimalPart = input.remainder(java.math.BigDecimal.ONE)
+        // To avoid overflow while multiplication, extract decimal part first,
+        // truncate it and then add it to whole part
+        // eg: input 123.456, scale 2, result 123.45
+        if (jm.BigDecimal.ZERO.compareTo(decimalPart) == 0) {
+          new jm.BigDecimal(wholePart)
+        } else {
+          val newRemainder = new 
jm.BigDecimal(decimalPart.multiply(pow).toBigInteger).divide(pow)
+          new jm.BigDecimal(wholePart).add(newRemainder)
+        }
+      } else if (position == 0) {
+        // position is 0, extract whole part
+        // eg: input 123.456, scale 0, result 123
+        new jm.BigDecimal(wholePart)
+      } else {
+        // position is -ve, truncate the whole part by absolute value of 
position
+        // eg: input 123.456, scale -2, result 100
+        if (jm.BigInteger.ZERO.compareTo(wholePart) == 0) {
+          new jm.BigDecimal(wholePart)
+        } else {
+          val pow = jm.BigInteger.valueOf(Math.pow(10, 
Math.abs(position)).toLong)
+          new jm.BigDecimal(wholePart.divide(pow).multiply(pow), 0)
+        }
+      }
+    }
+  }
+}
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = """_FUNC_(number[, position]) - Returns the number after truncated 
to the specified places.
+    An optional `position` parameter can be specified to truncate digits to 
the right of the decimal point.
+    If 0, it removes all the decimal values and returns only the integer.
+    If negative, the number is truncated to the left side of the decimal point.
+    There is an overloaded version of this function to truncate date values
+    _FUNC_(date, fmt) - Returns `date` with the time portion of the day 
truncated to the unit specified by the format model `fmt`.
+  """,
+  arguments = """
+    Arguments:
+      * number - number to be truncated
+      * position - number of decimal places up to which the given number is to 
be truncated
+    Arguments: To truncate date value:

Review Comment:
   ```suggestion
       Arguments: To truncate date values:
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala:
##########
@@ -331,6 +332,268 @@ case class RoundCeil(child: Expression, scale: Expression)
     copy(child = newLeft, scale = newRight)
 }
 
+case class TruncNumber(child: Expression, scale: Expression)
+  extends BaseBinaryExpression with NullIntolerant {
+
+  override protected def withNewChildrenInternal(
+      newLeft: Expression,
+      newRight: Expression): TruncNumber = copy(child = newLeft, scale = 
newRight)
+
+  /**
+   * Returns Java source code that can be compiled to evaluate this 
expression. The default
+   * behavior is to call the eval method of the expression. Concrete 
expression implementations
+   * should override this to do actual code generation.
+   *
+   * @param ctx
+   *   a [[CodegenContext]]
+   * @param ev
+   *   an [[ExprCode]] with unique terms.
+   * @return
+   *   an [[ExprCode]] containing the Java source code to generate the given 
expression
+   */
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode =
+    defineCodeGen(
+      ctx,
+      ev,
+      (input, _) => {
+        dataType match {
+          case ByteType if (_scale <= 0) =>
+            
s"""(byte)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case ShortType if (_scale <= 0) =>
+            
s"""(short)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case IntegerType if (_scale <= 0) =>
+            
s"""(int)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case LongType if (_scale <= 0) =>
+            s"""(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}))""".stripMargin
+          case FloatType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).floatValue()""".stripMargin
+          case DoubleType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).doubleValue()""".stripMargin
+          case DecimalType.Fixed(_, _) =>
+            s"""Decimal.apply(
+             |org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |${input}.toJavaBigDecimal(), ${_scale}))""".stripMargin
+          case _ => s"$input"
+        }
+      })
+
+  /**
+   * Returns the [[DataType]] of the result of evaluating this expression. It 
is invalid to query
+   * the dataType of an unresolved expression (i.e., when `resolved` == false).
+   */
+  override lazy val dataType: DataType = {
+    child.dataType match {
+      case DecimalType.Fixed(p, s) =>
+        val newPosition =
+          if (_scale > 0) {
+            if (_scale >= s) {
+              s
+            } else {
+              _scale
+            }
+          } else {
+            0
+          }
+        DecimalType(p - s + newPosition, newPosition)
+      case t => t
+    }
+  }
+
+  /**
+   * Called by default [[eval]] implementation. If subclass of 
BinaryExpression keep the default
+   * nullability, they can override this method to save null-check code. If we 
need full control
+   * of evaluation process, we should override [[eval]].
+   */
+  override protected def nullSafeEval(input1: Any, input2: Any): Any = {
+    dataType match {
+      case ByteType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Byte].toLong, _scale).toByte
+      case ShortType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Short].toLong, _scale).shortValue
+      case IntegerType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Int].toLong, _scale).intValue
+      case LongType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Long], _scale).longValue
+      case FloatType =>
+        TruncNumber.trunc(input1.asInstanceOf[Float], _scale).floatValue
+      case DoubleType =>
+        TruncNumber.trunc(input1.asInstanceOf[Double], _scale).doubleValue
+      case DecimalType.Fixed(p, s) =>
+        
Decimal(TruncNumber.trunc(input1.asInstanceOf[Decimal].toJavaBigDecimal, 
_scale))
+      case _ => input1
+    }
+  }
+}
+
+object TruncNumber {
+  /**
+   * To truncate whole numbers ; byte, short, int, long types
+   */
+  def trunc(input: Long, position: Int): Long = {
+    if (position >= 0) {
+      input
+    } else {
+      // position is -ve, truncate the number by absolute value of position
+      // eg: input 123 , scale -2 , result 100
+      val pow = Math.pow(10, Math.abs(position)).toLong
+      (input / pow) * pow
+    }
+  }
+
+  /**
+   * To truncate double and float type
+   */
+  def trunc(input: Double, position: Int): BigDecimal = {
+    trunc(jm.BigDecimal.valueOf(input), position)
+  }
+
+  /**
+   * To truncate decimal type
+   */
+  def trunc(input: jm.BigDecimal, position: Int): jm.BigDecimal = {
+    if (input.scale < position) {
+      input
+    } else {
+      val wholePart = input.toBigInteger
+      if (position > 0) {
+        // position is +ve , truncate only the decimal part by value of 
position
+        val pow = jm.BigDecimal.valueOf(Math.pow(10, position).toLong)
+        val decimalPart = input.remainder(java.math.BigDecimal.ONE)
+        // To avoid overflow while multiplication, extract decimal part first,
+        // truncate it and then add it to whole part
+        // eg: input 123.456, scale 2, result 123.45
+        if (jm.BigDecimal.ZERO.compareTo(decimalPart) == 0) {
+          new jm.BigDecimal(wholePart)
+        } else {
+          val newRemainder = new 
jm.BigDecimal(decimalPart.multiply(pow).toBigInteger).divide(pow)
+          new jm.BigDecimal(wholePart).add(newRemainder)
+        }
+      } else if (position == 0) {
+        // position is 0, extract whole part
+        // eg: input 123.456, scale 0, result 123
+        new jm.BigDecimal(wholePart)
+      } else {
+        // position is -ve, truncate the whole part by absolute value of 
position
+        // eg: input 123.456, scale -2, result 100
+        if (jm.BigInteger.ZERO.compareTo(wholePart) == 0) {
+          new jm.BigDecimal(wholePart)
+        } else {
+          val pow = jm.BigInteger.valueOf(Math.pow(10, 
Math.abs(position)).toLong)
+          new jm.BigDecimal(wholePart.divide(pow).multiply(pow), 0)
+        }
+      }
+    }
+  }
+}
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = """_FUNC_(number[, position]) - Returns the number after truncated 
to the specified places.

Review Comment:
   ```suggestion
     usage = """_FUNC_(number[, position]) - Returns the number after 
truncating to the specified number of digits.
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala:
##########
@@ -331,6 +332,268 @@ case class RoundCeil(child: Expression, scale: Expression)
     copy(child = newLeft, scale = newRight)
 }
 
+case class TruncNumber(child: Expression, scale: Expression)
+  extends BaseBinaryExpression with NullIntolerant {
+
+  override protected def withNewChildrenInternal(
+      newLeft: Expression,
+      newRight: Expression): TruncNumber = copy(child = newLeft, scale = 
newRight)
+
+  /**
+   * Returns Java source code that can be compiled to evaluate this 
expression. The default
+   * behavior is to call the eval method of the expression. Concrete 
expression implementations
+   * should override this to do actual code generation.
+   *
+   * @param ctx
+   *   a [[CodegenContext]]
+   * @param ev
+   *   an [[ExprCode]] with unique terms.
+   * @return
+   *   an [[ExprCode]] containing the Java source code to generate the given 
expression
+   */
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode =
+    defineCodeGen(
+      ctx,
+      ev,
+      (input, _) => {
+        dataType match {
+          case ByteType if (_scale <= 0) =>
+            
s"""(byte)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case ShortType if (_scale <= 0) =>
+            
s"""(short)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case IntegerType if (_scale <= 0) =>
+            
s"""(int)(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |(long)$input, ${_scale}))""".stripMargin
+          case LongType if (_scale <= 0) =>
+            s"""(org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}))""".stripMargin
+          case FloatType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).floatValue()""".stripMargin
+          case DoubleType if (_scale <= 0) =>
+            s"""org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |$input, ${_scale}).doubleValue()""".stripMargin
+          case DecimalType.Fixed(_, _) =>
+            s"""Decimal.apply(
+             |org.apache.spark.sql.catalyst.expressions.TruncNumber.trunc(
+             |${input}.toJavaBigDecimal(), ${_scale}))""".stripMargin
+          case _ => s"$input"
+        }
+      })
+
+  /**
+   * Returns the [[DataType]] of the result of evaluating this expression. It 
is invalid to query
+   * the dataType of an unresolved expression (i.e., when `resolved` == false).
+   */
+  override lazy val dataType: DataType = {
+    child.dataType match {
+      case DecimalType.Fixed(p, s) =>
+        val newPosition =
+          if (_scale > 0) {
+            if (_scale >= s) {
+              s
+            } else {
+              _scale
+            }
+          } else {
+            0
+          }
+        DecimalType(p - s + newPosition, newPosition)
+      case t => t
+    }
+  }
+
+  /**
+   * Called by default [[eval]] implementation. If subclass of 
BinaryExpression keep the default
+   * nullability, they can override this method to save null-check code. If we 
need full control
+   * of evaluation process, we should override [[eval]].
+   */
+  override protected def nullSafeEval(input1: Any, input2: Any): Any = {
+    dataType match {
+      case ByteType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Byte].toLong, _scale).toByte
+      case ShortType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Short].toLong, _scale).shortValue
+      case IntegerType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Int].toLong, _scale).intValue
+      case LongType if (_scale <= 0) =>
+        TruncNumber.trunc(input1.asInstanceOf[Long], _scale).longValue
+      case FloatType =>
+        TruncNumber.trunc(input1.asInstanceOf[Float], _scale).floatValue
+      case DoubleType =>
+        TruncNumber.trunc(input1.asInstanceOf[Double], _scale).doubleValue
+      case DecimalType.Fixed(p, s) =>
+        
Decimal(TruncNumber.trunc(input1.asInstanceOf[Decimal].toJavaBigDecimal, 
_scale))
+      case _ => input1
+    }
+  }
+}
+
+object TruncNumber {
+  /**
+   * To truncate whole numbers ; byte, short, int, long types
+   */
+  def trunc(input: Long, position: Int): Long = {
+    if (position >= 0) {
+      input
+    } else {
+      // position is -ve, truncate the number by absolute value of position
+      // eg: input 123 , scale -2 , result 100
+      val pow = Math.pow(10, Math.abs(position)).toLong
+      (input / pow) * pow
+    }
+  }
+
+  /**
+   * To truncate double and float type
+   */
+  def trunc(input: Double, position: Int): BigDecimal = {
+    trunc(jm.BigDecimal.valueOf(input), position)
+  }
+
+  /**
+   * To truncate decimal type
+   */
+  def trunc(input: jm.BigDecimal, position: Int): jm.BigDecimal = {
+    if (input.scale < position) {
+      input
+    } else {
+      val wholePart = input.toBigInteger
+      if (position > 0) {
+        // position is +ve , truncate only the decimal part by value of 
position
+        val pow = jm.BigDecimal.valueOf(Math.pow(10, position).toLong)
+        val decimalPart = input.remainder(java.math.BigDecimal.ONE)
+        // To avoid overflow while multiplication, extract decimal part first,
+        // truncate it and then add it to whole part
+        // eg: input 123.456, scale 2, result 123.45
+        if (jm.BigDecimal.ZERO.compareTo(decimalPart) == 0) {
+          new jm.BigDecimal(wholePart)
+        } else {
+          val newRemainder = new 
jm.BigDecimal(decimalPart.multiply(pow).toBigInteger).divide(pow)
+          new jm.BigDecimal(wholePart).add(newRemainder)
+        }
+      } else if (position == 0) {
+        // position is 0, extract whole part
+        // eg: input 123.456, scale 0, result 123
+        new jm.BigDecimal(wholePart)
+      } else {
+        // position is -ve, truncate the whole part by absolute value of 
position
+        // eg: input 123.456, scale -2, result 100
+        if (jm.BigInteger.ZERO.compareTo(wholePart) == 0) {
+          new jm.BigDecimal(wholePart)
+        } else {
+          val pow = jm.BigInteger.valueOf(Math.pow(10, 
Math.abs(position)).toLong)
+          new jm.BigDecimal(wholePart.divide(pow).multiply(pow), 0)
+        }
+      }
+    }
+  }
+}
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = """_FUNC_(number[, position]) - Returns the number after truncated 
to the specified places.
+    An optional `position` parameter can be specified to truncate digits to 
the right of the decimal point.
+    If 0, it removes all the decimal values and returns only the integer.
+    If negative, the number is truncated to the left side of the decimal point.
+    There is an overloaded version of this function to truncate date values
+    _FUNC_(date, fmt) - Returns `date` with the time portion of the day 
truncated to the unit specified by the format model `fmt`.
+  """,
+  arguments = """
+    Arguments:
+      * number - number to be truncated
+      * position - number of decimal places up to which the given number is to 
be truncated
+    Arguments: To truncate date value:
+      * date - date value or valid date string
+      * fmt - the format representing the unit to be truncated to
+          - "YEAR", "YYYY", "YY" - truncate to the first date of the year that 
the `date` falls in
+          - "QUARTER" - truncate to the first date of the quarter that the 
`date` falls in
+          - "MONTH", "MM", "MON" - truncate to the first date of the month 
that the `date` falls in
+          - "WEEK" - truncate to the Monday of the week that the `date` falls 
in
+  """,
+  examples = """
+    Examples:
+      > SELECT _FUNC_('2019-08-04', 'week');
+       2019-07-29
+      > SELECT _FUNC_('2019-08-04', 'quarter');
+       2019-07-01
+      > SELECT _FUNC_('2009-02-12', 'MM');
+       2009-02-01
+      > SELECT _FUNC_('2015-10-27', 'YEAR');
+       2015-01-01
+      > SELECT _FUNC_(-10.11, 0);
+       -10
+      > SELECT _FUNC_(10.11, -1);
+       10
+      > SELECT _FUNC_(100.61, 0);
+       100
+      > SELECT _FUNC_(-19087.1560, -3);
+       -19000
+      > SELECT _FUNC_(10876.5489, -1);
+       10870
+      > SELECT _FUNC_(-7767.1160, 2);
+       -7767.11
+      > SELECT _FUNC_(17646.6019, 3);
+       17646.601
+  """,
+  since = "3.4.0",
+  group = "math_funcs")
+// scalastyle:on line.size.limit
+object TruncExpressionBuilder extends ExpressionBuilder {
+  override def build(funcName: String, expressions: Seq[Expression]): 
Expression = {
+    val numArgs = expressions.length
+    if (numArgs >= 1) {
+      expressions(0).dataType match {
+        case ByteType |  ShortType | IntegerType | LongType | FloatType | 
DoubleType
+             | DecimalType.Fixed(_, _) => buildTruncNumber(funcName, 
expressions)
+        case _ => buildTruncDate(funcName, expressions)
+      }
+    } else {
+      throw QueryCompilationErrors.invalidFunctionArgumentNumberError(Seq(2), 
funcName, numArgs)
+    }
+  }
+
+  private def buildTruncDate(funcName: String, expressions: Seq[Expression]) = 
{
+    val numArgs = expressions.length
+    if (numArgs == 2) {
+      TruncDate(expressions(0), expressions(1))
+    } else {
+      throw QueryCompilationErrors.invalidFunctionArgumentNumberError(Seq(2), 
funcName, numArgs)
+    }
+  }
+
+  private def buildTruncNumber(funcName: String, expressions: Seq[Expression]) 
= {
+    val numArgs = expressions.length
+    if (numArgs >= 1) {
+      val position = if (numArgs == 2) {
+        val positionExpr = expressions(1)
+        if (!(positionExpr.foldable && positionExpr.dataType == IntegerType)) {
+          throw QueryCompilationErrors.requireLiteralParameter(funcName, 
"position", "int")
+        }
+        val scale_value = positionExpr.eval()
+        if (scale_value == null) {
+          throw QueryCompilationErrors.requireLiteralParameter(funcName, 
"position", "int")
+        } else {

Review Comment:
   no need for an `else` here since throwing the exception will leave the 
function



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to