Morning all, been struggling with this for a while and can't really seem to understand what I'm doing wrong...
Having the following DataFrame I want to apply the corr function over the following DF; val sampleColumns = Seq("group", "id", "count1", "count2", "orderCount") val sampleSet = Seq( ("group1", "id1", 1, 1, 6), ("group1", "id2", 2, 2, 5), ("group1", "id3", 3, 3, 4), ("group2", "id4", 4, 4, 3), ("group2", "id5", 5, 5, 2), ("group2", "id6", 6, 6, 1) ) val initialSet = sparkSession .createDataFrame(sampleSet) .toDF(sampleColumns: _*) ----- .show() +------+---+------+------+----------+ | group| id|count1|count2|orderCount| +------+---+------+------+----------+ |group1|id1| 1| 1| 6| |group1|id2| 2| 2| 5| |group1|id3| 3| 3| 4| |group2|id4| 4| 4| 3| |group2|id5| 5| 5| 2| |group2|id6| 6| 6| 1| +------+---+------+------+----------+ val initialSetWindow = Window .partitionBy("group") .orderBy("orderCountSum") .rowsBetween(Window.unboundedPreceding, Window.currentRow) val groupedSet = initialSet .groupBy( "group" ).agg( sum("count1").as("count1Sum"), sum("count2").as("count2Sum"), sum("orderCount").as("orderCountSum") ) .withColumn("cf", corr("count1Sum", "count2Sum").over(initialSetWindow)) ----- .show() +------+---------+---------+-------------+----+ | group|count1Sum|count2Sum|orderCountSum| cf| +------+---------+---------+-------------+----+ |group1| 6| 6| 15|null| |group2| 15| 15| 6|null| +------+---------+---------+-------------+----+ When trying to apply the corr function, some of the resulting values in cf are null for some reason: The question is, *how can I apply corr to each of the rows within their subgroup (Window)?* Would like to obtain the corr value per Row and subgroup (group1 and group2), and even if more nested IDs were added (group + id) it should still work.