HeartSaVioR commented on code in PR #43788:
URL: https://github.com/apache/spark/pull/43788#discussion_r1392113944
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -57,12 +58,31 @@ class StatePartitionReader(
partition.sourceOptions.operatorId, partition.partition,
partition.sourceOptions.storeName)
val stateStoreProviderId = StateStoreProviderId(stateStoreId,
partition.queryId)
+ val allStateStoreMetadata = new StateMetadataPartitionReader(
+ partition.sourceOptions.stateCheckpointLocation.getParent.toString,
hadoopConf)
+ .stateMetadata.toArray
+
+ val stateStoreMetadata = allStateStoreMetadata.filter(entry =>
+ entry.operatorId == partition.sourceOptions.operatorId
+ && entry.stateStoreName == partition.sourceOptions.storeName
+ )
+ val numColsPrefixKey = if (stateStoreMetadata.isEmpty) {
+ logWarning("Metadata for state store not found, possible cause is this
checkpoint " +
+ "is created by older version of spark. The state of session window
aggregation can't be " +
+ "read correctly without state metadata and runtime exception will be
thrown. " +
+ "Run the streaming query in newer spark version to generate state
metadata.")
+ 0
+ } else {
+ require(stateStoreMetadata.length == 1)
+ stateStoreMetadata.head.numColsPrefixKey
+ }
+
// TODO: This does not handle the case of session window aggregation; we
don't have an
Review Comment:
Let's remove this TODO comment as you're addressing this.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala:
##########
@@ -494,6 +497,48 @@ abstract class StateDataSourceReadSuite extends
StateDataSourceTestBase with Ass
}
}
+ test("Session window aggregation") {
+ withTempDir { checkpointDir =>
+ val input = MemoryStream[(String, Long)]
+ val sessionWindow = session_window($"eventTime", "10 seconds")
+
+ val events = input.toDF()
+ .select($"_1".as("value"), $"_2".as("timestamp"))
+ .withColumn("eventTime", $"timestamp".cast("timestamp"))
+ .withWatermark("eventTime", "30 seconds")
+ .selectExpr("explode(split(value, ' ')) AS sessionId", "eventTime")
+
+ val streamingDf = events
+ .groupBy(sessionWindow as Symbol("session"), $"sessionId")
+ .agg(count("*").as("numEvents"))
+ .selectExpr("sessionId", "CAST(session.start AS LONG)",
"CAST(session.end AS LONG)",
+ "CAST(session.end AS LONG) - CAST(session.start AS LONG) AS
durationMs",
+ "numEvents")
+
+ testStream(streamingDf, OutputMode.Complete())(
+ StartStream(checkpointLocation = checkpointDir.toString),
+ AddData(input,
+ ("hello world spark streaming", 40L),
+ ("world hello structured streaming", 41L)
+ ),
+ CheckNewAnswer(
+ ("hello", 40, 51, 11, 2),
+ ("world", 40, 51, 11, 2),
+ ("streaming", 40, 51, 11, 2),
+ ("spark", 40, 50, 10, 1),
+ ("structured", 41, 51, 10, 1)
+ ),
+ StopStream
+ )
+
Review Comment:
nit: single empty line
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala:
##########
@@ -195,7 +195,7 @@ class StateMetadataPartitionReader(
}
}
- private lazy val stateMetadata: Iterator[StateMetadataTableEntry] = {
+ private[state] lazy val stateMetadata: Iterator[StateMetadataTableEntry] = {
Review Comment:
Is this change necessary?
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala:
##########
@@ -494,6 +497,48 @@ abstract class StateDataSourceReadSuite extends
StateDataSourceTestBase with Ass
}
}
+ test("Session window aggregation") {
+ withTempDir { checkpointDir =>
+ val input = MemoryStream[(String, Long)]
Review Comment:
Shall we follow the pattern we do for StateDataSourceReadSuite vs
StateDataSourceTestBase? The main reason I put the part of query execution to
StateDataSourceTestBase is that we'd probably be likely to reuse the query
between test for read vs test for write.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -57,12 +58,31 @@ class StatePartitionReader(
partition.sourceOptions.operatorId, partition.partition,
partition.sourceOptions.storeName)
val stateStoreProviderId = StateStoreProviderId(stateStoreId,
partition.queryId)
+ val allStateStoreMetadata = new StateMetadataPartitionReader(
+ partition.sourceOptions.stateCheckpointLocation.getParent.toString,
hadoopConf)
+ .stateMetadata.toArray
+
+ val stateStoreMetadata = allStateStoreMetadata.filter(entry =>
Review Comment:
nit: `(entry =>` to ` { entry =>` (attention on the space around {)
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala:
##########
@@ -494,6 +497,48 @@ abstract class StateDataSourceReadSuite extends
StateDataSourceTestBase with Ass
}
}
+ test("Session window aggregation") {
+ withTempDir { checkpointDir =>
+ val input = MemoryStream[(String, Long)]
+ val sessionWindow = session_window($"eventTime", "10 seconds")
+
+ val events = input.toDF()
+ .select($"_1".as("value"), $"_2".as("timestamp"))
+ .withColumn("eventTime", $"timestamp".cast("timestamp"))
+ .withWatermark("eventTime", "30 seconds")
+ .selectExpr("explode(split(value, ' ')) AS sessionId", "eventTime")
+
+ val streamingDf = events
+ .groupBy(sessionWindow as Symbol("session"), $"sessionId")
+ .agg(count("*").as("numEvents"))
+ .selectExpr("sessionId", "CAST(session.start AS LONG)",
"CAST(session.end AS LONG)",
+ "CAST(session.end AS LONG) - CAST(session.start AS LONG) AS
durationMs",
+ "numEvents")
+
+ testStream(streamingDf, OutputMode.Complete())(
+ StartStream(checkpointLocation = checkpointDir.toString),
+ AddData(input,
+ ("hello world spark streaming", 40L),
+ ("world hello structured streaming", 41L)
+ ),
+ CheckNewAnswer(
+ ("hello", 40, 51, 11, 2),
+ ("world", 40, 51, 11, 2),
+ ("streaming", 40, 51, 11, 2),
+ ("spark", 40, 50, 10, 1),
+ ("structured", 41, 51, 10, 1)
+ ),
+ StopStream
+ )
+
+
+ val df = spark.read.format("statestore").load(checkpointDir.toString)
+ .selectExpr("key.sessionId as sessionId", "value.count as count")
Review Comment:
Shall we validate the full schema of the state, including the part of
session window?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]