Github user yhuai commented on a diff in the pull request:
https://github.com/apache/spark/pull/9819#discussion_r47451906
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
---
@@ -246,85 +260,238 @@ object SpecifiedWindowFrame {
}
}
+case class UnresolvedWindowExpression(
+ child: Expression,
+ windowSpec: WindowSpecReference) extends UnaryExpression with
Unevaluable {
+
+ override def dataType: DataType = throw new UnresolvedException(this,
"dataType")
+ override def foldable: Boolean = throw new UnresolvedException(this,
"foldable")
+ override def nullable: Boolean = throw new UnresolvedException(this,
"nullable")
+ override lazy val resolved = false
+}
+
+case class WindowExpression(
+ windowFunction: Expression,
+ windowSpec: WindowSpecDefinition) extends Expression with Unevaluable {
+
+ override def children: Seq[Expression] = windowFunction :: windowSpec ::
Nil
+
+ override def dataType: DataType = windowFunction.dataType
+ override def foldable: Boolean = windowFunction.foldable
+ override def nullable: Boolean = windowFunction.nullable
+
+ override def toString: String = s"$windowFunction $windowSpec"
+}
+
/**
- * Every window function needs to maintain a output buffer for its output.
- * It should expect that for a n-row window frame, it will be called n
times
- * to retrieve value corresponding with these n rows.
+ * A window function is a function that can only be evaluated in the
context of a window operator.
*/
trait WindowFunction extends Expression {
- def init(): Unit
+ /** Frame in which the window operator must be executed. */
+ def frame: WindowFrame = UnspecifiedFrame
+}
- def reset(): Unit
+/**
+ * An offset window function is a window function that returns the value
of the input column offset
+ * by a number of rows within the partition. For instance: an
OffsetWindowfunction for value x with
+ * offset -2, will get the value of x 2 rows back in the partition.
+ */
+abstract class OffsetWindowFunction
+ extends Expression with WindowFunction with Unevaluable with
ImplicitCastInputTypes {
+ val input: Expression
+ val default: Expression
+ val offset: Expression
+ val offsetSign: Int
+
+ override def children: Seq[Expression] = Seq(input, offset, default)
- def prepareInputParameters(input: InternalRow): AnyRef
+ override def foldable: Boolean = input.foldable && (default == null ||
default.foldable)
- def update(input: AnyRef): Unit
+ override def nullable: Boolean = input.nullable && (default == null ||
default.nullable)
- def batchUpdate(inputs: Array[AnyRef]): Unit
+ override lazy val frame = {
+ // This will be triggered by the Analyzer.
+ val offsetValue = offset.eval() match {
+ case o: Int => o
+ case x => throw new AnalysisException(
+ s"Offset expression must be a foldable integer expression: $x")
+ }
+ val boundary = ValueFollowing(offsetSign * offsetValue)
+ SpecifiedWindowFrame(RowFrame, boundary, boundary)
+ }
- def evaluate(): Unit
+ override def dataType: DataType = input.dataType
- def get(index: Int): Any
+ override def inputTypes: Seq[AbstractDataType] =
+ Seq(AnyDataType, IntegerType, TypeCollection(input.dataType, NullType))
- def newInstance(): WindowFunction
+ override def toString: String = s"$prettyName($input, $offset, $default)"
}
-case class UnresolvedWindowFunction(
- name: String,
- children: Seq[Expression])
- extends Expression with WindowFunction with Unevaluable {
+case class Lead(input: Expression, offset: Expression, default: Expression)
+ extends OffsetWindowFunction {
- override def dataType: DataType = throw new UnresolvedException(this,
"dataType")
- override def foldable: Boolean = throw new UnresolvedException(this,
"foldable")
- override def nullable: Boolean = throw new UnresolvedException(this,
"nullable")
- override lazy val resolved = false
+ def this(input: Expression, offset: Expression) = this(input, offset,
Literal(null))
- override def init(): Unit = throw new UnresolvedException(this, "init")
- override def reset(): Unit = throw new UnresolvedException(this, "reset")
- override def prepareInputParameters(input: InternalRow): AnyRef =
- throw new UnresolvedException(this, "prepareInputParameters")
- override def update(input: AnyRef): Unit = throw new
UnresolvedException(this, "update")
- override def batchUpdate(inputs: Array[AnyRef]): Unit =
- throw new UnresolvedException(this, "batchUpdate")
- override def evaluate(): Unit = throw new UnresolvedException(this,
"evaluate")
- override def get(index: Int): Any = throw new UnresolvedException(this,
"get")
+ def this(input: Expression) = this(input, Literal(1))
- override def toString: String = s"'$name(${children.mkString(",")})"
+ def this() = this(Literal(null))
- override def newInstance(): WindowFunction = throw new
UnresolvedException(this, "newInstance")
+ val offsetSign = 1
}
-case class UnresolvedWindowExpression(
- child: UnresolvedWindowFunction,
- windowSpec: WindowSpecReference) extends UnaryExpression with
Unevaluable {
+case class Lag(input: Expression, offset: Expression, default: Expression)
+ extends OffsetWindowFunction {
- override def dataType: DataType = throw new UnresolvedException(this,
"dataType")
- override def foldable: Boolean = throw new UnresolvedException(this,
"foldable")
- override def nullable: Boolean = throw new UnresolvedException(this,
"nullable")
- override lazy val resolved = false
-}
+ def this(input: Expression, offset: Expression) = this(input, offset,
Literal(null))
-case class WindowExpression(
- windowFunction: WindowFunction,
- windowSpec: WindowSpecDefinition) extends Expression with Unevaluable {
+ def this(input: Expression) = this(input, Literal(1))
- override def children: Seq[Expression] = windowFunction :: windowSpec ::
Nil
+ def this() = this(Literal(null))
- override def dataType: DataType = windowFunction.dataType
- override def foldable: Boolean = windowFunction.foldable
- override def nullable: Boolean = windowFunction.nullable
+ val offsetSign = -1
+}
- override def toString: String = s"$windowFunction $windowSpec"
+abstract class AggregateWindowFunction extends DeclarativeAggregate with
WindowFunction {
+ self: Product =>
+ override val frame = SpecifiedWindowFrame(RowFrame, UnboundedPreceding,
CurrentRow)
+ override def dataType: DataType = IntegerType
+ override def nullable: Boolean = false
+ override val mergeExpressions = Nil // TODO how to deal with this?
--- End diff --
There is a `supportsPartial` field in `AggregateFunction`. We can override
that and set it as `false`? Also, if `mergeExpressions` is not expected to be
called, we can throw an `UnsupportedOperationException`.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]