viirya commented on a change in pull request #33077:
URL: https://github.com/apache/spark/pull/33077#discussion_r669315813



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MergingSortWithSessionWindowStateIterator.scala
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection, 
UnsafeRow}
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.streaming.state.{ReadStateStore, 
StreamingSessionWindowStateManager}
+
+/**
+ * This class technically does the merge sort between input rows and existing 
sessions in state,
+ * to optimize the cost of sort on "input rows + existing sessions". This is 
based on the
+ * precondition that input rows are sorted by "group keys + start time of 
session window".
+ *
+ * This only materializes the existing sessions into memory, which are tend to 
be not many per
+ * group key. The cost of sorting existing sessions would be also minor based 
on the assumption.
+ *
+ * The output rows are sorted with "group keys + start time of session 
window", which is same as
+ * the sort condition on input rows.
+ */
+class MergingSortWithSessionWindowStateIterator(
+    iter: Iterator[InternalRow],
+    stateManager: StreamingSessionWindowStateManager,
+    store: ReadStateStore,
+    groupWithoutSessionExpressions: Seq[Attribute],
+    sessionExpression: Attribute,
+    inputSchema: Seq[Attribute]) extends Iterator[InternalRow] with Logging {
+
+  private val keysProjection: UnsafeProjection = 
GenerateUnsafeProjection.generate(
+    groupWithoutSessionExpressions, inputSchema)
+  private val sessionProjection: UnsafeProjection =
+    GenerateUnsafeProjection.generate(Seq(sessionExpression), inputSchema)
+
+  private case class SessionRowInformation(
+      keys: UnsafeRow,
+      sessionStart: Long,
+      sessionEnd: Long,
+      row: InternalRow)
+
+  private object SessionRowInformation {
+    def of(row: InternalRow): SessionRowInformation = {
+      val keys = keysProjection(row).copy()
+      val session = sessionProjection(row).copy()
+      val sessionRow = session.getStruct(0, 2)
+      val sessionStart = sessionRow.getLong(0)
+      val sessionEnd = sessionRow.getLong(1)
+
+      SessionRowInformation(keys, sessionStart, sessionEnd, row)
+    }
+  }
+
+  private var currentRow: SessionRowInformation = _
+  private var currentStateRow: SessionRowInformation = _
+  private var currentStateIter: Iterator[InternalRow] = _
+  private var currentStateFetchedKey: UnsafeRow = _

Review comment:
       A few suggestions:
   
   currentRow -> currentRowFromInput
   currentStateRow -> currentRowFromState
   currentStateIter -> sessionIterFromState
   currentStateFetchedKey -> currentSessionKey

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MergingSortWithSessionWindowStateIterator.scala
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection, 
UnsafeRow}
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.streaming.state.{ReadStateStore, 
StreamingSessionWindowStateManager}
+
+/**
+ * This class technically does the merge sort between input rows and existing 
sessions in state,
+ * to optimize the cost of sort on "input rows + existing sessions". This is 
based on the
+ * precondition that input rows are sorted by "group keys + start time of 
session window".
+ *
+ * This only materializes the existing sessions into memory, which are tend to 
be not many per
+ * group key. The cost of sorting existing sessions would be also minor based 
on the assumption.
+ *
+ * The output rows are sorted with "group keys + start time of session 
window", which is same as
+ * the sort condition on input rows.
+ */
+class MergingSortWithSessionWindowStateIterator(
+    iter: Iterator[InternalRow],
+    stateManager: StreamingSessionWindowStateManager,
+    store: ReadStateStore,
+    groupWithoutSessionExpressions: Seq[Attribute],
+    sessionExpression: Attribute,
+    inputSchema: Seq[Attribute]) extends Iterator[InternalRow] with Logging {
+
+  private val keysProjection: UnsafeProjection = 
GenerateUnsafeProjection.generate(
+    groupWithoutSessionExpressions, inputSchema)
+  private val sessionProjection: UnsafeProjection =
+    GenerateUnsafeProjection.generate(Seq(sessionExpression), inputSchema)
+
+  private case class SessionRowInformation(
+      keys: UnsafeRow,
+      sessionStart: Long,
+      sessionEnd: Long,
+      row: InternalRow)
+
+  private object SessionRowInformation {
+    def of(row: InternalRow): SessionRowInformation = {
+      val keys = keysProjection(row).copy()
+      val session = sessionProjection(row).copy()
+      val sessionRow = session.getStruct(0, 2)
+      val sessionStart = sessionRow.getLong(0)
+      val sessionEnd = sessionRow.getLong(1)
+
+      SessionRowInformation(keys, sessionStart, sessionEnd, row)
+    }
+  }
+
+  private var currentRow: SessionRowInformation = _
+  private var currentStateRow: SessionRowInformation = _
+  private var currentStateIter: Iterator[InternalRow] = _
+  private var currentStateFetchedKey: UnsafeRow = _
+
+  override def hasNext: Boolean = {
+    currentRow != null || currentStateRow != null ||
+      (currentStateIter != null && currentStateIter.hasNext) || iter.hasNext
+  }
+
+  override def next(): InternalRow = {
+    if (currentRow == null) {
+      mayFillCurrentRow()
+    }
+
+    if (currentStateRow == null) {
+      mayFillCurrentStateRow()
+    }
+
+    if (currentRow == null && currentStateRow == null) {
+      throw new IllegalStateException("No Row to provide in next() which 
should not happen!")
+    }
+
+    // return current row vs current state row, should return smaller key, 
earlier session start
+    val returnCurrentRow: Boolean = {
+      if (currentRow == null) {
+        false
+      } else if (currentStateRow == null) {
+        true
+      } else {
+        // compare
+        if (currentRow.keys != currentStateRow.keys) {
+          // state row cannot advance to row in input, so state row should be 
lower

Review comment:
       Does this case mean, the input iterator advances to new keys other than 
current sessions from the state? So we should output from current sessions 
until it ends and retrieves new sessions from the state again? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to