zifeif2 commented on code in PR #53386:
URL: https://github.com/apache/spark/pull/53386#discussion_r2625691797
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatePartitionAllColumnFamiliesWriterSuite.scala:
##########
@@ -329,15 +389,42 @@ class StatePartitionAllColumnFamiliesWriterSuite extends
StateDataSourceTestBase
performRoundTripTest(
sourceDir.getAbsolutePath,
targetDir.getAbsolutePath,
- keySchema,
- valueSchema,
- keyStateEncoderSpec
+ createSingleColumnFamilySchemaMap(keySchema, valueSchema,
keyStateEncoderSpec)
)
}
}
}
}
+ private val keyToNumValuesColFamilyNames = Seq("left-keyToNumValues",
"right-keyToNumValues")
Review Comment:
They are the same! Okay I'll move them somewhere
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StatePartitionWriter.scala:
##########
@@ -123,6 +159,12 @@ class StatePartitionAllColumnFamiliesWriter(
val valueRow = new
UnsafeRow(columnFamilyToValueSchemaLenMap(colFamilyName))
valueRow.pointTo(valueBytes, valueBytes.length)
- stateStore.put(keyRow, valueRow, colFamilyName)
+ if (columnFamilyToSchemaMap(colFamilyName).useMultipleValuesPerKey) {
+ // if a column family useMultipleValuesPerKey (e.g. ListType), we will
+ // write with 1 put followed by merge
+ stateStore.merge(keyRow, valueRow, colFamilyName)
Review Comment:
Ah I should probably update the comment but I tried just using merge and the
test was correct?
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatePartitionAllColumnFamiliesWriterSuite.scala:
##########
@@ -146,18 +148,47 @@ class StatePartitionAllColumnFamiliesWriterSuite extends
StateDataSourceTestBase
assert(!checkpointFileExists(new File(targetDir, storeNamePath),
versionToCheck, ".changelog"))
assert(checkpointFileExists(new File(targetDir, storeNamePath),
versionToCheck, ".zip"))
- // Step 4: Read from target using normal reader
- val targetReader = spark.read
- .format("statestore")
- .option(StateSourceOptions.PATH, targetDir)
- val targetNormalData = (storeName match {
- case Some(name) => targetReader.option(StateSourceOptions.STORE_NAME,
name)
- case None => targetReader
- }).load()
- .selectExpr("key", "value", "partition_id")
- .collect()
+ // Step 3: Validate by reading from both source and target using normal
reader"
+ // Default selectExprs for most column families
+ val defaultSelectExprs = Seq("key", "value", "partition_id")
+
+ def shouldCheckColumnFamilyName: String => Boolean = name => {
+ (!name.startsWith("$")
+ || (columnFamilyToStateSourceOptions.contains(name) &&
Review Comment:
We won't need this anymore because the latest multi-cf reader PR supports
reading internal columns in testing
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]