Github user scwf commented on a diff in the pull request:
https://github.com/apache/spark/pull/5604#discussion_r29544497
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
---
@@ -189,6 +194,200 @@ private[hive] case class HiveGenericUdf(funcWrapper:
HiveFunctionWrapper, childr
}
}
+private[spark] object ResolveHiveWindowFunction extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+ case p: LogicalPlan if !p.childrenResolved => p
+
+ // We are resolving WindowExpressions at here.
+ case p: LogicalPlan =>
+ p transformExpressions {
+ case WindowExpression(
+ UnresolvedWindowFunction(name, children),
+ windowSpec: WindowSpecDefinition) =>
+ // First, let's find the window function info.
+ val windowFunctionInfo: WindowFunctionInfo =
+
Option(FunctionRegistry.getWindowFunctionInfo(name.toLowerCase)).getOrElse(
+ throw new AnalysisException(s"Couldn't find window function
$name"))
+
+ val functionClassName =
windowFunctionInfo.getFunctionClass.getName
+ val newChildren =
+ // Rank(), DENSE_RANK(), CUME_DIST(), and PERCENT_RANK do not
take explicit
+ // input parameters. These functions in Hive require implicit
parameters, which
+ // are expressions in Order By clause.
+ if
(classOf[GenericUDAFRank].isAssignableFrom(windowFunctionInfo.getFunctionClass))
{
+ if (children.nonEmpty) {
+ throw new AnalysisException(s"$name does not take input
parameters.")
+ }
+ windowSpec.orderSpec.map(_.child)
+ } else {
+ children
+ }
+
+ val isUDAFBridgeRequired =
+ if
(classOf[UDAF].isAssignableFrom(windowFunctionInfo.getFunctionClass)) {
+ true
+ } else {
+ false
+ }
+
+ val windowFunction =
+ HiveWindowFunction(
+ new HiveFunctionWrapper(functionClassName),
+ windowFunctionInfo.isPivotResult,
+ isUDAFBridgeRequired,
+ newChildren)
+
+ // Second, check if the specified window function can accept
window definition.
+ windowSpec.frameSpecification match {
+ case frame: SpecifiedWindowFrame if
!windowFunctionInfo.isSupportsWindow =>
+ throw new AnalysisException(
+ s"Window function $name does not take a frame
specification.")
+ case frame: SpecifiedWindowFrame
+ if windowFunctionInfo.isSupportsWindow &&
windowFunctionInfo.isPivotResult =>
+ // These two should not be true at the same time when a
window frame is defined.
+ // If so, throw an exception.
+ throw new AnalysisException(s"Could not handle Hive window
function $name because " +
+ s"it supports both a user specified window frame and pivot
result.")
+ case _ => // OK
+ }
+ // Resolve those UnspecifiedWindowFrame.
+ val newWindowSpec = windowSpec.frameSpecification match {
+ case UnspecifiedFrame =>
+ val newWindowFrame =
+ SpecifiedWindowFrame.defaultWindowFrame(
+ windowSpec.orderSpec.nonEmpty,
+ windowFunctionInfo.isSupportsWindow)
+ WindowSpecDefinition(windowSpec.partitionSpec,
windowSpec.orderSpec, newWindowFrame)
+ case _ => windowSpec
+ }
+
+ WindowExpression(windowFunction, newWindowSpec)
+ }
+ }
+}
+
+
+// If pivotResult is true, the Hive function will return a list of values
representing the
+// values of the added columns.
+private[hive] case class HiveWindowFunction(
+ funcWrapper: HiveFunctionWrapper,
+ pivotResult: Boolean,
+ isUDAFBridgeRequired: Boolean,
+ children: Seq[Expression]) extends WindowFunction
+ with HiveInspectors {
+
+ // Hive window functions are based on GenericUDAFResolver2.
+ type UDFType = GenericUDAFResolver2
+
+ @transient
+ protected lazy val resolver: GenericUDAFResolver2 =
+ if (isUDAFBridgeRequired) {
+ new GenericUDAFBridge(funcWrapper.createFunction[UDAF]())
+ } else {
+ funcWrapper.createFunction[GenericUDAFResolver2]()
+ }
+
+ @transient
+ protected lazy val inputInspectors = children.map(toInspector).toArray
+
+ // The GenericUDAFEvaluator used to evaluate the window function.
+ @transient
+ protected lazy val evaluator: GenericUDAFEvaluator = {
+ val parameterInfo = new
SimpleGenericUDAFParameterInfo(inputInspectors, false, false)
+ resolver.getEvaluator(parameterInfo)
+ }
+
+ // The object inspector of values returned from the Hive window fucntion.
--- End diff --
typo function
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]