Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8587#discussion_r43453305
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala
 ---
    @@ -524,6 +525,116 @@ case class Sum(child: Expression) extends 
DeclarativeAggregate {
       override val evaluateExpression = Cast(currentSum, resultType)
     }
     
    +case class Corr(
    +    left: Expression,
    +    right: Expression,
    +    mutableAggBufferOffset: Int = 0,
    +    inputAggBufferOffset: Int = 0)
    +  extends ImperativeAggregate {
    +
    +  def children: Seq[Expression] = Seq(left, right)
    +
    +  def nullable: Boolean = false
    +
    +  def dataType: DataType = DoubleType
    +
    +  def inputTypes: Seq[AbstractDataType] = Seq(DoubleType)
    +
    +  def aggBufferSchema: StructType = 
StructType.fromAttributes(aggBufferAttributes)
    +
    +  def inputAggBufferAttributes: Seq[AttributeReference] = 
aggBufferAttributes.map(_.newInstance())
    +
    +  val aggBufferAttributes: Seq[AttributeReference] = Seq(
    +    AttributeReference("xAvg", DoubleType)(),
    +    AttributeReference("yAvg", DoubleType)(),
    +    AttributeReference("Ck", DoubleType)(),
    +    AttributeReference("MkX", DoubleType)(),
    +    AttributeReference("MkY", DoubleType)(),
    +    AttributeReference("count", LongType)())
    +
    +  override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: 
Int): ImperativeAggregate =
    +    copy(mutableAggBufferOffset = newMutableAggBufferOffset)
    +
    +  override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
ImperativeAggregate =
    +    copy(inputAggBufferOffset = newInputAggBufferOffset)
    +
    +  override def initialize(buffer: MutableRow): Unit = {
    +    (0 until 5).map(idx => buffer.setDouble(mutableAggBufferOffset + idx, 
0.0))
    +    buffer.setLong(mutableAggBufferOffset + 5, 0L)
    +  }
    +
    +  override def update(buffer: MutableRow, input: InternalRow): Unit = {
    +    val x = left.eval(input).asInstanceOf[Double]
    +    val y = right.eval(input).asInstanceOf[Double]
    +
    +    var xAvg = buffer.getDouble(mutableAggBufferOffset)
    +    var yAvg = buffer.getDouble(mutableAggBufferOffset + 1)
    +    var Ck = buffer.getDouble(mutableAggBufferOffset + 2)
    +    var MkX = buffer.getDouble(mutableAggBufferOffset + 3)
    +    var MkY = buffer.getDouble(mutableAggBufferOffset + 4)
    +    var count = buffer.getLong(mutableAggBufferOffset + 5)
    +
    +    val deltaX = x - xAvg
    +    val deltaY = y - yAvg
    +    count += 1
    +    xAvg += deltaX / count
    +    yAvg += deltaY / count
    +    Ck += deltaX * (y - yAvg)
    +    MkX += deltaX * (x - xAvg)
    +    MkY += deltaY * (y - yAvg)
    +
    +    buffer.setDouble(mutableAggBufferOffset, xAvg)
    +    buffer.setDouble(mutableAggBufferOffset + 1, yAvg)
    +    buffer.setDouble(mutableAggBufferOffset + 2, Ck)
    +    buffer.setDouble(mutableAggBufferOffset + 3, MkX)
    +    buffer.setDouble(mutableAggBufferOffset + 4, MkY)
    +    buffer.setLong(mutableAggBufferOffset + 5, count)
    +  }
    +
    +  override def merge(buffer1: MutableRow, buffer2: InternalRow): Unit = {
    +    val count2 = buffer2.getLong(inputAggBufferOffset + 5)
    +
    +    if (count2 > 0) {
    +      var xAvg = buffer1.getDouble(mutableAggBufferOffset)
    +      var yAvg = buffer1.getDouble(mutableAggBufferOffset + 1)
    +      var Ck = buffer1.getDouble(mutableAggBufferOffset + 2)
    +      var MkX = buffer1.getDouble(mutableAggBufferOffset + 3)
    +      var MkY = buffer1.getDouble(mutableAggBufferOffset + 4)
    +      var count = buffer1.getLong(mutableAggBufferOffset + 5)
    +
    +      val xAvg2 = buffer2.getDouble(inputAggBufferOffset)
    +      val yAvg2 = buffer2.getDouble(inputAggBufferOffset + 1)
    +      val Ck2 = buffer2.getDouble(inputAggBufferOffset + 2)
    +      val MkX2 = buffer2.getDouble(inputAggBufferOffset + 3)
    +      val MkY2 = buffer2.getDouble(inputAggBufferOffset + 4)
    +
    +      val totalCount = count + count2
    +      val deltaX = xAvg - xAvg2
    +      val deltaY = yAvg - yAvg2
    +      Ck += Ck2 + deltaX * deltaY * count / totalCount * count2
    +      xAvg = (xAvg * count + xAvg2 * count2) / totalCount
    +      yAvg = (yAvg * count + yAvg2 * count2) / totalCount
    +      MkX += MkX2 + deltaX * deltaX * count / totalCount * count2
    +      MkY += MkY2 + deltaY * deltaY * count / totalCount * count2
    +      count = totalCount
    +
    +      buffer1.setDouble(mutableAggBufferOffset, xAvg)
    +      buffer1.setDouble(mutableAggBufferOffset + 1, yAvg)
    +      buffer1.setDouble(mutableAggBufferOffset + 2, Ck)
    +      buffer1.setDouble(mutableAggBufferOffset + 3, MkX)
    +      buffer1.setDouble(mutableAggBufferOffset + 4, MkY)
    +      buffer1.setLong(mutableAggBufferOffset + 5, count)
    +    }
    +  }
    +
    +  override def eval(buffer: InternalRow): Any = {
    +    val Ck = buffer.getDouble(mutableAggBufferOffset + 2)
    +    val MkX = buffer.getDouble(mutableAggBufferOffset + 3)
    +    val MkY = buffer.getDouble(mutableAggBufferOffset + 4)
    +    Ck / math.sqrt(MkX * MkY)
    --- End diff --
    
    From the failed test HiveCompatibilitySuite, looks like Hive will return 
NULL for this case. I think we should follow it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to