Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/15949#discussion_r89230294
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
---
@@ -72,6 +71,30 @@ case class CurrentTimestamp() extends LeafExpression
with CodegenFallback {
}
/**
+ * Expression representing the current batch time, which is used by
StreamExecution to
+ * 1. prevent optimizer from pushing this expression below a stateful
operator
+ * 2. allow IncrementalExecution to substitute this expression with a
Literal(timestamp)
+ *
+ * There is no code generation since this expression should be replaced
with a literal.
+ */
+case class CurrentBatchTimestamp(timestampMs: Long, dataType: DataType)
+ extends LeafExpression with Nondeterministic with CodegenFallback {
+
+ override def nullable: Boolean = false
+
+ override def prettyName: String = "current_batch_timestamp"
+
+ override protected def initializeInternal(partitionIndex: Int): Unit = {}
+
+ override protected def evalInternal(input: InternalRow): Any =
timestampMs
+
+ def toLiteral: Literal = dataType match {
+ case _: TimestampType => Literal(timestampMs * 1000L, TimestampType)
--- End diff --
nit: Use `Literal(DateTimeUtils.fromJavaTimestamp(new
java.sql.Timestamp(timestampMs)), TimestampType)` instead. This is not a big
deal anyway since `timestampMs` is always positive here.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]