HeartSaVioR commented on a change in pull request #31570:
URL: https://github.com/apache/spark/pull/31570#discussion_r596658962
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
##########
@@ -438,6 +443,434 @@ case class StateStoreSaveExec(
}
}
+/**
+ * For each input tuple, the key is calculated and the value from the
[[StateStore]] is added
+ * to the stream (in addition to the input tuple) if present.
+ *
+ * The keyExpressions should exclude the sessionWindow expression.
+ */
+case class SessionWindowStateStoreRestoreExec(
+ keyExpressions: Seq[Attribute],
+ timeExpression: Attribute,
+ stateInfo: Option[StatefulOperatorStateInfo],
+ child: SparkPlan)
+ extends UnaryExecNode with StateStoreReader {
+
+ import StreamingSessionWindowHelper._
+
+ private val storeConf = new StateStoreConf(sqlContext.conf)
+ private val hadoopConfBcast = sparkContext.broadcast(
+ new SerializableConfiguration(SessionState.newHadoopConf(
+ sparkContext.hadoopConfiguration, sqlContext.conf)))
+
+ override protected def doExecute(): RDD[InternalRow] = {
+ val numOutputRows = longMetric("numOutputRows")
+ assert(keyExpressions.nonEmpty, "Grouping key must be specified when using
sessionWindow")
+
+ val stateVersion =
conf.getConf(SQLConf.STREAMING_SESSION_WINDOW_STATE_FORMAT_VERSION)
+ val stateStoreCoord =
sqlContext.sessionState.streamingQueryManager.stateStoreCoordinator
+
+ child.execute().mapPartitionsWithStateStoreAwareRDD(
+ getStateInfo,
+ StreamingSessionWindowStateManager.allStateStoreNames(stateVersion),
+ stateStoreCoord) { case (partitionId, iter) =>
+
+ val stateStoreManager =
StreamingSessionWindowStateManager.createStateManager(
+ keyExpressions, timeExpression, child.output, child.output, stateInfo,
storeConf,
+ hadoopConfBcast.value.value, partitionId, stateVersion)
+
+ var preKey: UnsafeRow = null
+ iter.flatMap { row =>
+ val key = stateStoreManager.getKey(row)
+ val startTime = stateStoreManager.getStartTime(row)
+ var savedState: Seq[UnsafeRow] = null
+
+ // For one key, we only get once from state store.
+ // e.g. the iterator may contains elements below
+ // | key | window | value |
+ // | a | w1 | xx |
+ // | a | w2 | xx |
+ // | b | w3 | xx |
+ // | c | w4 | xx |
+ // for the key a of different window, we only got once
+ // from statestore, otherwise will get error result
+ if (preKey == null || key != preKey) {
+ savedState = stateStoreManager.getStates(key)
+
+ // must copy the key. The key is a UnsafeRow and pointer to some
memory
+ // when next `getKey` invoke the value of the memory will change, so
the value of
+ // preKey will change automatically
+ //
+ // e.g. If the key = a, assign preKey by key without copy, when next
step of flapMap
+ // after the `getKey` the key = b, the preKey also is b, however,
the expected value
+ // of preKey is a
+ preKey = key.copy
+ }
+
+ if (savedState == null) {
+ numOutputRows += 1
+ Seq(row)
+ } else {
+ val outputs = savedState :+ row
Review comment:
As I said, I don't think this retains the overall order hence requiring
additional sort. `savedState` can be injected anywhere in input rows.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]