HeartSaVioR commented on a change in pull request #31986:
URL: https://github.com/apache/spark/pull/31986#discussion_r629795680



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsExec.scala
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, 
SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+
+/**
+ * This node updates the session window spec of each input rows via analyzing 
neighbor rows and
+ * determining rows belong to the same session window. The number of input 
rows remains the same.
+ * This node requires sort on input rows by group keys + the start time of 
session window.
+ *
+ * There are lots of overhead compared to [[MergingSessionsExec]]. Use 
[[MergingSessionsExec]]
+ * instead whenever possible. Use this node only when we cannot apply both 
calculations
+ * determining session windows and aggregating rows in session window 
altogether.
+ *
+ * Refer [[UpdatingSessionsIterator]] for more details.
+ */
+case class UpdatingSessionsExec(
+    keyExpressions: Seq[Attribute],
+    sessionExpression: Attribute,
+    child: SparkPlan) extends UnaryExecNode {
+
+  private val groupingWithoutSessionExpression = keyExpressions.filterNot {
+    p => p.semanticEquals(sessionExpression)
+  }
+  private val groupingWithoutSessionAttributes =
+    groupingWithoutSessionExpression.map(_.toAttribute)
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    val inMemoryThreshold = 
sqlContext.conf.sessionWindowBufferInMemoryThreshold
+    val spillThreshold = sqlContext.conf.sessionWindowBufferSpillThreshold
+
+    child.execute().mapPartitions { iter =>
+      new UpdatingSessionsIterator(iter, keyExpressions, sessionExpression,
+        child.output, inMemoryThreshold, spillThreshold)
+    }
+  }
+
+  override def output: Seq[Attribute] = child.output
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+    if (groupingWithoutSessionExpression.isEmpty) {
+      AllTuples :: Nil
+    } else {
+      ClusteredDistribution(groupingWithoutSessionExpression) :: Nil
+    }
+  }
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
+    Seq((groupingWithoutSessionAttributes ++ Seq(sessionExpression))

Review comment:
       Nice finding. Will update.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsIterator.scala
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray
+
+/**
+ * This class calculates and updates the session window for each element in 
the given iterator,
+ * which makes elements in the same session window having same session spec. 
Downstream can apply
+ * aggregation to finally merge these elements bound to the same session 
window.
+ *
+ * This class works on the precondition that given iterator is sorted by 
"group keys + start time
+ * of session window", and this iterator still retains the characteristic of 
the sort.
+ *
+ * This class copies the elements to safely update on each element, as well as 
buffers elements
+ * which are bound to the same session window. Due to such overheads, 
[[MergingSessionsIterator]]
+ * should be used whenever possible.
+ */
+class UpdatingSessionsIterator(
+    iter: Iterator[InternalRow],
+    groupingExpressions: Seq[NamedExpression],
+    sessionExpression: NamedExpression,
+    inputSchema: Seq[Attribute],
+    inMemoryThreshold: Int,
+    spillThreshold: Int) extends Iterator[InternalRow] {
+
+  private val groupingWithoutSession: Seq[NamedExpression] =
+    groupingExpressions.diff(Seq(sessionExpression))
+  private val groupingWithoutSessionAttributes: Seq[Attribute] =
+    groupingWithoutSession.map(_.toAttribute)
+  private[this] val groupingWithoutSessionProjection: UnsafeProjection =
+    UnsafeProjection.create(groupingWithoutSession, inputSchema)
+
+  private val valuesExpressions: Seq[Attribute] = 
inputSchema.diff(groupingWithoutSession)

Review comment:
       Yes but looks like not necessary. Will fix.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsIterator.scala
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray
+
+/**
+ * This class calculates and updates the session window for each element in 
the given iterator,
+ * which makes elements in the same session window having same session spec. 
Downstream can apply
+ * aggregation to finally merge these elements bound to the same session 
window.
+ *
+ * This class works on the precondition that given iterator is sorted by 
"group keys + start time
+ * of session window", and this iterator still retains the characteristic of 
the sort.
+ *
+ * This class copies the elements to safely update on each element, as well as 
buffers elements
+ * which are bound to the same session window. Due to such overheads, 
[[MergingSessionsIterator]]
+ * should be used whenever possible.
+ */
+class UpdatingSessionsIterator(
+    iter: Iterator[InternalRow],
+    groupingExpressions: Seq[NamedExpression],
+    sessionExpression: NamedExpression,
+    inputSchema: Seq[Attribute],
+    inMemoryThreshold: Int,
+    spillThreshold: Int) extends Iterator[InternalRow] {
+
+  private val groupingWithoutSession: Seq[NamedExpression] =
+    groupingExpressions.diff(Seq(sessionExpression))
+  private val groupingWithoutSessionAttributes: Seq[Attribute] =
+    groupingWithoutSession.map(_.toAttribute)
+  private[this] val groupingWithoutSessionProjection: UnsafeProjection =
+    UnsafeProjection.create(groupingWithoutSession, inputSchema)
+
+  private val valuesExpressions: Seq[Attribute] = 
inputSchema.diff(groupingWithoutSession)
+
+  private[this] val sessionProjection: UnsafeProjection =
+    UnsafeProjection.create(Seq(sessionExpression), inputSchema)
+
+  // Below three variables hold the information for "current session".
+  private var currentKeys: InternalRow = _
+  private var currentSession: UnsafeRow = _
+  private var rowsForCurrentSession: ExternalAppendOnlyUnsafeRowArray = _
+
+  // Below two variables hold the information for "returning rows". The reason 
we have this in
+  // addition to "current session" is that there could be the chance that 
iterator for returning
+  // rows on previous session wasn't fully consumed and there's a new session 
being started.
+  private var returnRows: ExternalAppendOnlyUnsafeRowArray = _
+  private var returnRowsIter: Iterator[InternalRow] = _
+
+  // Mark this to raise error on any operations after the iterator figures out 
the error.
+  private var errorOnIterator: Boolean = false
+
+  private val processedKeys: mutable.HashSet[InternalRow] = new 
mutable.HashSet[InternalRow]()
+
+  override def hasNext: Boolean = {
+    assertIteratorNotCorrupted()
+
+    if (returnRowsIter != null && returnRowsIter.hasNext) {
+      return true
+    }
+
+    if (returnRowsIter != null) {
+      returnRowsIter = null
+      returnRows.clear()
+    }
+
+    iter.hasNext
+  }
+
+  override def next(): InternalRow = {
+    assertIteratorNotCorrupted()
+
+    if (returnRowsIter != null && returnRowsIter.hasNext) {
+      return returnRowsIter.next()
+    }
+
+    var exitCondition = false
+    while (iter.hasNext && !exitCondition) {
+      // we are going to modify the row, so we should make sure multiple 
objects are not
+      // referencing same memory, which could be possible when optimizing 
iterator
+      // without this, multiple rows in same key will be returned with same 
content
+      val row = iter.next().copy()
+
+      val keys = groupingWithoutSessionProjection(row)
+      val session = sessionProjection(row)
+      val sessionStruct = session.getStruct(0, 2)
+      val sessionStart = getSessionStart(sessionStruct)
+      val sessionEnd = getSessionEnd(sessionStruct)
+
+      if (currentKeys == null) {
+        startNewSession(row, keys, sessionStruct)
+      } else if (keys != currentKeys) {
+        closeCurrentSession(keyChanged = true)
+        processedKeys.add(currentKeys)
+        startNewSession(row, keys, sessionStruct)
+        exitCondition = true
+      } else {
+        if (sessionStart < getSessionStart(currentSession)) {
+          handleBrokenPreconditionForSort()
+        } else if (sessionStart <= getSessionEnd(currentSession)) {
+          // expanding session length if needed
+          expandEndOfCurrentSession(sessionEnd)
+          rowsForCurrentSession.add(row.asInstanceOf[UnsafeRow])
+        } else {
+          closeCurrentSession(keyChanged = false)
+          startNewSession(row, keys, sessionStruct)
+          exitCondition = true

Review comment:
       Yes, every row has session end which session gap is applied. Merging 
sessions would expand session end as later one which session gap is applied.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsExec.scala
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, 
SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+
+/**
+ * This node updates the session window spec of each input rows via analyzing 
neighbor rows and
+ * determining rows belong to the same session window. The number of input 
rows remains the same.
+ * This node requires sort on input rows by group keys + the start time of 
session window.
+ *
+ * There are lots of overhead compared to [[MergingSessionsExec]]. Use 
[[MergingSessionsExec]]
+ * instead whenever possible. Use this node only when we cannot apply both 
calculations
+ * determining session windows and aggregating rows in session window 
altogether.
+ *
+ * Refer [[UpdatingSessionsIterator]] for more details.
+ */
+case class UpdatingSessionsExec(
+    keyExpressions: Seq[Attribute],

Review comment:
       Yeah I agree the name is confused. Let me rename to 
`groupingExpressions`. Thanks!

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsExec.scala
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, 
SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+
+/**
+ * This node updates the session window spec of each input rows via analyzing 
neighbor rows and
+ * determining rows belong to the same session window. The number of input 
rows remains the same.
+ * This node requires sort on input rows by group keys + the start time of 
session window.
+ *
+ * There are lots of overhead compared to [[MergingSessionsExec]]. Use 
[[MergingSessionsExec]]
+ * instead whenever possible. Use this node only when we cannot apply both 
calculations
+ * determining session windows and aggregating rows in session window 
altogether.

Review comment:
       I don't list the case explicitly as we use UpdatingSessionsExec whenever 
needed; you can find the case in original PR, but please let me know if we'd 
like to explicitly specify some cases here as well.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsIterator.scala
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray
+
+/**
+ * This class calculates and updates the session window for each element in 
the given iterator,
+ * which makes elements in the same session window having same session spec. 
Downstream can apply
+ * aggregation to finally merge these elements bound to the same session 
window.
+ *
+ * This class works on the precondition that given iterator is sorted by 
"group keys + start time
+ * of session window", and this iterator still retains the characteristic of 
the sort.
+ *
+ * This class copies the elements to safely update on each element, as well as 
buffers elements
+ * which are bound to the same session window. Due to such overheads, 
[[MergingSessionsIterator]]
+ * should be used whenever possible.
+ */
+class UpdatingSessionsIterator(
+    iter: Iterator[InternalRow],
+    groupingExpressions: Seq[NamedExpression],
+    sessionExpression: NamedExpression,
+    inputSchema: Seq[Attribute],
+    inMemoryThreshold: Int,
+    spillThreshold: Int) extends Iterator[InternalRow] {
+
+  private val groupingWithoutSession: Seq[NamedExpression] =
+    groupingExpressions.diff(Seq(sessionExpression))
+  private val groupingWithoutSessionAttributes: Seq[Attribute] =
+    groupingWithoutSession.map(_.toAttribute)
+  private[this] val groupingWithoutSessionProjection: UnsafeProjection =
+    UnsafeProjection.create(groupingWithoutSession, inputSchema)
+
+  private val valuesExpressions: Seq[Attribute] = 
inputSchema.diff(groupingWithoutSession)
+
+  private[this] val sessionProjection: UnsafeProjection =
+    UnsafeProjection.create(Seq(sessionExpression), inputSchema)
+
+  // Below three variables hold the information for "current session".
+  private var currentKeys: InternalRow = _
+  private var currentSession: UnsafeRow = _
+  private var rowsForCurrentSession: ExternalAppendOnlyUnsafeRowArray = _
+
+  // Below two variables hold the information for "returning rows". The reason 
we have this in
+  // addition to "current session" is that there could be the chance that 
iterator for returning
+  // rows on previous session wasn't fully consumed and there's a new session 
being started.
+  private var returnRows: ExternalAppendOnlyUnsafeRowArray = _
+  private var returnRowsIter: Iterator[InternalRow] = _
+
+  // Mark this to raise error on any operations after the iterator figures out 
the error.
+  private var errorOnIterator: Boolean = false
+
+  private val processedKeys: mutable.HashSet[InternalRow] = new 
mutable.HashSet[InternalRow]()
+
+  override def hasNext: Boolean = {
+    assertIteratorNotCorrupted()
+
+    if (returnRowsIter != null && returnRowsIter.hasNext) {
+      return true
+    }
+
+    if (returnRowsIter != null) {
+      returnRowsIter = null
+      returnRows.clear()
+    }
+
+    iter.hasNext
+  }
+
+  override def next(): InternalRow = {
+    assertIteratorNotCorrupted()
+
+    if (returnRowsIter != null && returnRowsIter.hasNext) {
+      return returnRowsIter.next()
+    }
+
+    var exitCondition = false
+    while (iter.hasNext && !exitCondition) {
+      // we are going to modify the row, so we should make sure multiple 
objects are not
+      // referencing same memory, which could be possible when optimizing 
iterator
+      // without this, multiple rows in same key will be returned with same 
content
+      val row = iter.next().copy()
+
+      val keys = groupingWithoutSessionProjection(row)
+      val session = sessionProjection(row)
+      val sessionStruct = session.getStruct(0, 2)
+      val sessionStart = getSessionStart(sessionStruct)
+      val sessionEnd = getSessionEnd(sessionStruct)
+
+      if (currentKeys == null) {
+        startNewSession(row, keys, sessionStruct)
+      } else if (keys != currentKeys) {
+        closeCurrentSession(keyChanged = true)
+        processedKeys.add(currentKeys)
+        startNewSession(row, keys, sessionStruct)
+        exitCondition = true
+      } else {
+        if (sessionStart < getSessionStart(currentSession)) {
+          handleBrokenPreconditionForSort()
+        } else if (sessionStart <= getSessionEnd(currentSession)) {
+          // expanding session length if needed
+          expandEndOfCurrentSession(sessionEnd)
+          rowsForCurrentSession.add(row.asInstanceOf[UnsafeRow])
+        } else {
+          closeCurrentSession(keyChanged = false)
+          startNewSession(row, keys, sessionStruct)
+          exitCondition = true
+        }
+      }
+    }
+
+    if (!iter.hasNext) {
+      // no further row: closing session
+      closeCurrentSession(keyChanged = false)
+    }
+
+    // here returnRowsIter should be able to provide at least one row
+    require(returnRowsIter != null && returnRowsIter.hasNext)
+
+    returnRowsIter.next()
+  }
+
+  private def startNewSession(
+      currentRow: InternalRow,
+      groupingKey: UnsafeRow,
+      sessionStruct: UnsafeRow): Unit = {
+    if (processedKeys.contains(groupingKey)) {
+      handleBrokenPreconditionForSort()
+    }
+
+    currentKeys = groupingKey.copy()
+    currentSession = sessionStruct.copy()
+
+    rowsForCurrentSession = new 
ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold)
+    rowsForCurrentSession.add(currentRow.asInstanceOf[UnsafeRow])
+  }
+
+  private def getSessionStart(sessionStruct: UnsafeRow): Long = {
+    sessionStruct.getLong(0)
+  }
+
+  private def getSessionEnd(sessionStruct: UnsafeRow): Long = {
+    sessionStruct.getLong(1)
+  }
+
+  def updateSessionEnd(sessionStruct: UnsafeRow, sessionEnd: Long): Unit = {
+    sessionStruct.setLong(1, sessionEnd)
+  }
+
+  private def expandEndOfCurrentSession(sessionEnd: Long): Unit = {
+    if (sessionEnd > getSessionEnd(currentSession)) {
+      updateSessionEnd(currentSession, sessionEnd)
+    }
+  }
+
+  private def handleBrokenPreconditionForSort(): Unit = {
+    errorOnIterator = true
+    throw new IllegalStateException("The iterator must be sorted by key and 
session start!")
+  }
+
+  private val join = new JoinedRow
+  private val join2 = new JoinedRow
+
+  private val groupingKeyProj = 
GenerateUnsafeProjection.generate(groupingExpressions,
+    groupingWithoutSessionAttributes :+ sessionExpression.toAttribute)
+  private val valueProj = GenerateUnsafeProjection.generate(valuesExpressions, 
inputSchema)
+  private val restoreProj = GenerateUnsafeProjection.generate(inputSchema,
+    groupingExpressions.map(_.toAttribute) ++ 
valuesExpressions.map(_.toAttribute))
+
+  private def generateGroupingKey(): UnsafeRow = {
+    val newRow = new 
SpecificInternalRow(Seq(sessionExpression.toAttribute).toStructType)
+    newRow.update(0, currentSession)
+    val joined = join(currentKeys, newRow)
+
+    groupingKeyProj(joined)
+  }
+
+  private def closeCurrentSession(keyChanged: Boolean): Unit = {
+    assert(returnRowsIter == null || !returnRowsIter.hasNext)
+
+    returnRows = rowsForCurrentSession
+    rowsForCurrentSession = null
+
+    val groupingKey = generateGroupingKey()
+
+    val currentRowsIter = returnRows.generateIterator().map { internalRow =>
+      val valueRow = valueProj(internalRow)
+      restoreProj(join2(groupingKey, valueRow)).copy()

Review comment:
       Yeah just removed the projection of groupingKeys. Just needed to join 
the rows. Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to