vinodkc commented on code in PR #53376:
URL: https://github.com/apache/spark/pull/53376#discussion_r2654352496


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala:
##########
@@ -2532,6 +2534,148 @@ case class DateDiff(endDate: Expression, startDate: 
Expression)
     copy(endDate = newLeft, startDate = newRight)
 }
 
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = """
+    _FUNC_(bucket_width, timestamp[, origin]) - Returns the start of the 
timestamp bucket
+    containing the input timestamp, aligned to the specified origin.
+  """,
+  arguments = """
+    Arguments:
+      * bucket_width - A day-time interval specifying the width of each bucket 
(must be positive).
+      * timestamp - The temporal value to bucket (DATE, TIMESTAMP, or 
TIMESTAMP_NTZ).
+      * origin - Optional alignment point for bucket boundaries (default: 
1970-01-01 00:00:00 UTC).
+  """,
+  examples = """
+    Examples:
+      > SELECT _FUNC_(INTERVAL '1' HOUR, TIMESTAMP'2024-12-04 14:30:00', 
TIMESTAMP'2024-12-04 00:00:00');
+       2024-12-04 14:00:00
+      > SELECT _FUNC_(INTERVAL '6' HOUR, TIMESTAMP'2024-12-04 14:30:00', 
TIMESTAMP'2024-12-04 00:00:00');
+       2024-12-04 12:00:00
+      > SELECT _FUNC_(INTERVAL '7' DAY, DATE'2024-12-04', TIMESTAMP'1970-01-01 
00:00:00');
+       2024-11-28 00:00:00
+      > SELECT _FUNC_(INTERVAL '7' DAY, DATE'2024-12-04', TIMESTAMP'1970-01-05 
00:00:00');
+       2024-12-02 00:00:00
+  """,
+  note = """
+    Notes:
+      * Buckets are aligned to the specified origin (default: Unix epoch).
+      * The bucket width must be a positive day-time interval (constant 
expression).
+      * The origin must be a constant TIMESTAMP expression.
+      * Sub-day intervals (hours, minutes, seconds) are supported.
+      * Timestamps before the origin are handled correctly using floor 
division.
+      * Always returns TIMESTAMP type, regardless of input type.
+  """,
+  group = "datetime_funcs",
+  since = "4.2.0")
+// scalastyle:on line.size.limit
+case class TimestampBucket(
+    bucketWidth: Expression,
+    timestamp: Expression,
+    originTimestamp: Expression)
+  extends TernaryExpression with ImplicitCastInputTypes {
+
+  // Constructor for 2-argument version (default origin = epoch)
+  def this(bucketWidth: Expression, timestamp: Expression) = {
+    this(bucketWidth, timestamp, Literal(0L, TimestampType))
+  }
+
+  override def first: Expression = bucketWidth
+  override def second: Expression = timestamp
+  override def third: Expression = originTimestamp
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(
+    DayTimeIntervalType,
+    TypeCollection(DateType, TimestampType, TimestampNTZType),
+    TimestampType
+  )
+
+  override def dataType: DataType = TimestampType
+
+  override def nullable: Boolean =
+    bucketWidth.nullable || timestamp.nullable || originTimestamp.nullable
+
+  override def nullIntolerant: Boolean = true
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+    super.checkInputDataTypes() match {
+      case TypeCheckSuccess =>
+        // Validate bucket_width is foldable (constant expression)
+        if (!bucketWidth.foldable) {
+          return DataTypeMismatch(
+            errorSubClass = "NON_FOLDABLE_INPUT",
+            messageParameters = Map(
+              "inputName" -> toSQLId("bucket_width"),
+              "inputType" -> toSQLType(bucketWidth.dataType),
+              "inputExpr" -> toSQLExpr(bucketWidth)
+            )
+          )
+        }
+
+        if (!originTimestamp.foldable) {
+          return DataTypeMismatch(
+            errorSubClass = "NON_FOLDABLE_INPUT",
+            messageParameters = Map(
+              "inputName" -> toSQLId("origin"),
+              "inputType" -> toSQLType(originTimestamp.dataType),
+              "inputExpr" -> toSQLExpr(originTimestamp)
+            )
+          )
+        }
+
+        val widthValue = bucketWidth.eval()
+        if (widthValue != null) {
+          val widthMicros = widthValue.asInstanceOf[Long]
+          if (widthMicros <= 0) {
+            return DataTypeMismatch(
+              errorSubClass = "VALUE_OUT_OF_RANGE",
+              messageParameters = Map(
+                "exprName" -> "bucket_width",
+                "valueRange" -> s"(0, ${Long.MaxValue}]",
+                "currentValue" -> widthMicros.toString
+              )
+            )
+          }
+        }
+
+        TypeCheckSuccess
+
+      case failure => failure
+    }
+  }
+
+  override def nullSafeEval(
+      bucketWidthValue: Any,
+      timestampValue: Any,
+      originValue: Any): Any = {
+    val bucketMicros = bucketWidthValue.asInstanceOf[Long]
+
+    val timestampMicros = timestampValue match {
+      case days: Int => days.toLong * MICROS_PER_DAY
+      case micros: Long => micros
+    }
+
+    val originMicros = originValue.asInstanceOf[Long]
+    val shiftedTimestamp = timestampMicros - originMicros
+    val bucketIndex = Math.floorDiv(shiftedTimestamp, bucketMicros)
+
+    originMicros + (bucketIndex * bucketMicros)
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {

Review Comment:
   Yes, data type case match is required for both codegen and interpreted mode 
. Updated the code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to