Github user scwf commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5604#discussion_r29544497
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala 
---
    @@ -189,6 +194,200 @@ private[hive] case class HiveGenericUdf(funcWrapper: 
HiveFunctionWrapper, childr
       }
     }
     
    +private[spark] object ResolveHiveWindowFunction extends Rule[LogicalPlan] {
    +  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
    +    case p: LogicalPlan if !p.childrenResolved => p
    +
    +    // We are resolving WindowExpressions at here.
    +    case p: LogicalPlan =>
    +      p transformExpressions {
    +        case WindowExpression(
    +          UnresolvedWindowFunction(name, children),
    +          windowSpec: WindowSpecDefinition) =>
    +          // First, let's find the window function info.
    +          val windowFunctionInfo: WindowFunctionInfo =
    +            
Option(FunctionRegistry.getWindowFunctionInfo(name.toLowerCase)).getOrElse(
    +              throw new AnalysisException(s"Couldn't find window function 
$name"))
    +
    +          val functionClassName = 
windowFunctionInfo.getFunctionClass.getName
    +          val newChildren =
    +            // Rank(), DENSE_RANK(), CUME_DIST(), and PERCENT_RANK do not 
take explicit
    +            // input parameters. These functions in Hive require implicit 
parameters, which
    +            // are expressions in Order By clause.
    +            if 
(classOf[GenericUDAFRank].isAssignableFrom(windowFunctionInfo.getFunctionClass))
 {
    +              if (children.nonEmpty) {
    +               throw  new AnalysisException(s"$name does not take input 
parameters.")
    +              }
    +              windowSpec.orderSpec.map(_.child)
    +            } else {
    +              children
    +            }
    +
    +          val isUDAFBridgeRequired =
    +            if 
(classOf[UDAF].isAssignableFrom(windowFunctionInfo.getFunctionClass)) {
    +              true
    +            } else {
    +              false
    +            }
    +
    +          val windowFunction =
    +            HiveWindowFunction(
    +              new HiveFunctionWrapper(functionClassName),
    +              windowFunctionInfo.isPivotResult,
    +              isUDAFBridgeRequired,
    +              newChildren)
    +
    +          // Second, check if the specified window function can accept 
window definition.
    +          windowSpec.frameSpecification match {
    +            case frame: SpecifiedWindowFrame if 
!windowFunctionInfo.isSupportsWindow =>
    +              throw new AnalysisException(
    +                s"Window function $name does not take a frame 
specification.")
    +            case frame: SpecifiedWindowFrame
    +              if windowFunctionInfo.isSupportsWindow && 
windowFunctionInfo.isPivotResult =>
    +              // These two should not be true at the same time when a 
window frame is defined.
    +              // If so, throw an exception.
    +              throw new AnalysisException(s"Could not handle Hive window 
function $name because " +
    +                s"it supports both a user specified window frame and pivot 
result.")
    +            case _ => // OK
    +          }
    +          // Resolve those UnspecifiedWindowFrame.
    +          val newWindowSpec = windowSpec.frameSpecification match {
    +            case UnspecifiedFrame =>
    +              val newWindowFrame =
    +                SpecifiedWindowFrame.defaultWindowFrame(
    +                  windowSpec.orderSpec.nonEmpty,
    +                  windowFunctionInfo.isSupportsWindow)
    +              WindowSpecDefinition(windowSpec.partitionSpec, 
windowSpec.orderSpec, newWindowFrame)
    +            case _ => windowSpec
    +          }
    +
    +          WindowExpression(windowFunction, newWindowSpec)
    +      }
    +  }
    +}
    +
    +
    +// If pivotResult is true, the Hive function will return a list of values 
representing the
    +// values of the added columns.
    +private[hive] case class HiveWindowFunction(
    +    funcWrapper: HiveFunctionWrapper,
    +    pivotResult: Boolean,
    +    isUDAFBridgeRequired: Boolean,
    +    children: Seq[Expression]) extends WindowFunction
    +  with HiveInspectors {
    +
    +  // Hive window functions are based on GenericUDAFResolver2.
    +  type UDFType = GenericUDAFResolver2
    +
    +  @transient
    +  protected lazy val resolver: GenericUDAFResolver2 =
    +    if (isUDAFBridgeRequired) {
    +      new GenericUDAFBridge(funcWrapper.createFunction[UDAF]())
    +    } else {
    +      funcWrapper.createFunction[GenericUDAFResolver2]()
    +    }
    +
    +  @transient
    +  protected lazy val inputInspectors = children.map(toInspector).toArray
    +
    +  // The GenericUDAFEvaluator used to evaluate the window function.
    +  @transient
    +  protected lazy val evaluator: GenericUDAFEvaluator = {
    +    val parameterInfo = new 
SimpleGenericUDAFParameterInfo(inputInspectors, false, false)
    +    resolver.getEvaluator(parameterInfo)
    +  }
    +
    +  // The object inspector of values returned from the Hive window fucntion.
    --- End diff --
    
    typo function


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to