zifeif2 commented on code in PR #53386:
URL: https://github.com/apache/spark/pull/53386#discussion_r2625691797


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatePartitionAllColumnFamiliesWriterSuite.scala:
##########
@@ -329,15 +389,42 @@ class StatePartitionAllColumnFamiliesWriterSuite extends 
StateDataSourceTestBase
           performRoundTripTest(
             sourceDir.getAbsolutePath,
             targetDir.getAbsolutePath,
-            keySchema,
-            valueSchema,
-            keyStateEncoderSpec
+            createSingleColumnFamilySchemaMap(keySchema, valueSchema, 
keyStateEncoderSpec)
           )
         }
       }
     }
   }
 
+  private val keyToNumValuesColFamilyNames = Seq("left-keyToNumValues", 
"right-keyToNumValues")

Review Comment:
   They are the same! Okay I'll move them somewhere



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StatePartitionWriter.scala:
##########
@@ -123,6 +159,12 @@ class StatePartitionAllColumnFamiliesWriter(
     val valueRow = new 
UnsafeRow(columnFamilyToValueSchemaLenMap(colFamilyName))
     valueRow.pointTo(valueBytes, valueBytes.length)
 
-    stateStore.put(keyRow, valueRow, colFamilyName)
+    if (columnFamilyToSchemaMap(colFamilyName).useMultipleValuesPerKey) {
+      // if a column family useMultipleValuesPerKey (e.g. ListType), we will
+      // write with 1 put followed by merge
+      stateStore.merge(keyRow, valueRow, colFamilyName)

Review Comment:
   Ah I should probably update the comment but I tried just using merge and the 
test was correct? 



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatePartitionAllColumnFamiliesWriterSuite.scala:
##########
@@ -146,18 +148,47 @@ class StatePartitionAllColumnFamiliesWriterSuite extends 
StateDataSourceTestBase
     assert(!checkpointFileExists(new File(targetDir, storeNamePath), 
versionToCheck, ".changelog"))
     assert(checkpointFileExists(new File(targetDir, storeNamePath), 
versionToCheck, ".zip"))
 
-    // Step 4: Read from target using normal reader
-    val targetReader = spark.read
-      .format("statestore")
-      .option(StateSourceOptions.PATH, targetDir)
-    val targetNormalData = (storeName match {
-      case Some(name) => targetReader.option(StateSourceOptions.STORE_NAME, 
name)
-      case None => targetReader
-    }).load()
-      .selectExpr("key", "value", "partition_id")
-      .collect()
+    // Step 3: Validate by reading from both source and target using normal 
reader"
+    // Default selectExprs for most column families
+    val defaultSelectExprs = Seq("key", "value", "partition_id")
+
+    def shouldCheckColumnFamilyName: String => Boolean = name => {
+      (!name.startsWith("$")
+        || (columnFamilyToStateSourceOptions.contains(name) &&

Review Comment:
   We won't need this anymore because the latest multi-cf reader PR supports 
reading internal columns in testing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to