Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/18540#discussion_r130035366
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
---
@@ -43,57 +42,65 @@ case class WindowSpecDefinition(
orderSpec: Seq[SortOrder],
frameSpecification: WindowFrame) extends Expression with WindowSpec
with Unevaluable {
- def validate: Option[String] = frameSpecification match {
- case UnspecifiedFrame =>
- Some("Found a UnspecifiedFrame. It should be converted to a
SpecifiedWindowFrame " +
- "during analysis. Please file a bug report.")
- case frame: SpecifiedWindowFrame => frame.validate.orElse {
- def checkValueBasedBoundaryForRangeFrame(): Option[String] = {
- if (orderSpec.length > 1) {
- // It is not allowed to have a value-based PRECEDING and
FOLLOWING
- // as the boundary of a Range Window Frame.
- Some("This Range Window Frame only accepts at most one ORDER BY
expression.")
- } else if (orderSpec.nonEmpty &&
!orderSpec.head.dataType.isInstanceOf[NumericType]) {
- Some("The data type of the expression in the ORDER BY clause
should be a numeric type.")
- } else {
- None
- }
- }
-
- (frame.frameType, frame.frameStart, frame.frameEnd) match {
- case (RangeFrame, vp: ValuePreceding, _) =>
checkValueBasedBoundaryForRangeFrame()
- case (RangeFrame, vf: ValueFollowing, _) =>
checkValueBasedBoundaryForRangeFrame()
- case (RangeFrame, _, vp: ValuePreceding) =>
checkValueBasedBoundaryForRangeFrame()
- case (RangeFrame, _, vf: ValueFollowing) =>
checkValueBasedBoundaryForRangeFrame()
- case (_, _, _) => None
- }
- }
- }
-
- override def children: Seq[Expression] = partitionSpec ++ orderSpec
+ override def children: Seq[Expression] = partitionSpec ++ orderSpec :+
frameSpecification
override lazy val resolved: Boolean =
childrenResolved && checkInputDataTypes().isSuccess &&
frameSpecification.isInstanceOf[SpecifiedWindowFrame]
override def nullable: Boolean = true
override def foldable: Boolean = false
- override def dataType: DataType = throw new UnsupportedOperationException
+ override def dataType: DataType = throw new
UnsupportedOperationException("dataType")
- override def sql: String = {
- val partition = if (partitionSpec.isEmpty) {
- ""
- } else {
- "PARTITION BY " + partitionSpec.map(_.sql).mkString(", ") + " "
+ override def checkInputDataTypes(): TypeCheckResult = {
+ frameSpecification match {
+ case UnspecifiedFrame =>
+ TypeCheckFailure(
+ "Cannot use an UnspecifiedFrame. This should have been converted
during analysis. " +
+ "Please file a bug report.")
+ case f: SpecifiedWindowFrame if f.frameType == RangeFrame &&
!f.isUnbounded &&
+ orderSpec.isEmpty =>
+ TypeCheckFailure(
+ "A range window frame cannot be used in an unordered window
specification.")
+ case f: SpecifiedWindowFrame if f.frameType == RangeFrame &&
f.isValueBound &&
+ orderSpec.size > 1 =>
+ TypeCheckFailure(
+ s"A range window frame with value boundaries cannot be used in a
window specification " +
+ s"with multiple order by expressions:
${orderSpec.mkString(",")}")
+ case f: SpecifiedWindowFrame if f.frameType == RangeFrame &&
f.isValueBound &&
+ !isValidFrameType(f.valueBoundary.head.dataType) =>
+ TypeCheckFailure(
+ s"The data type '${orderSpec.head.dataType}' used in the order
specification does " +
+ s"not match the data type '${f.valueBoundary.head.dataType}'
which is used in the " +
+ "range frame.")
+ case f: SpecifiedWindowFrame if !isValidFrameBoundary(f.lower,
f.upper) =>
+ TypeCheckFailure(s"The upper bound of the window frame is
'${f.upper.sql}', which is " +
+ s"smaller than the lower bound '${f.lower.sql}'.")
+ case _ => TypeCheckSuccess
}
+ }
- val order = if (orderSpec.isEmpty) {
- ""
- } else {
- "ORDER BY " + orderSpec.map(_.sql).mkString(", ") + " "
+ override def sql: String = {
+ def toSql(exprs: Seq[Expression], prefix: String): Seq[String] = {
+ Seq(exprs).filter(_.nonEmpty).map(_.map(_.sql).mkString(prefix, ",
", ""))
}
- s"($partition$order${frameSpecification.toString})"
+ val elements =
+ toSql(partitionSpec, "PARTITION BY ") ++
+ toSql(orderSpec, "ORDER BY ") ++
+ Seq(frameSpecification.sql)
+ elements.mkString("(", " ", ")")
+ }
+
+ private def isValidFrameType(ft: DataType): Boolean =
orderSpec.head.dataType == ft
+
+ private def isValidFrameBoundary(lower: Expression, upper: Expression):
Boolean = {
+ (lower, upper) match {
+ case (UnboundedFollowing, _) => false
+ case (_, UnboundedPreceding) => false
+ case (l: Expression, u: SpecialFrameBoundary) => !u.notFollows(l)
--- End diff --
do we need to consider `(u: SpecialFrameBoundary, l: Expression)`?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]