micheal-o commented on code in PR #53355:
URL: https://github.com/apache/spark/pull/53355#discussion_r2600291907


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##########
@@ -587,6 +596,102 @@ class StreamingDeduplicationSuite extends 
StateStoreMetricsTest {
     }
     assert(ex.getMessage.contains("State store encoding format as avro is not 
supported"))
   }
+
+  test("Partition key extraction - Dedupe") {
+    val df = (input: Dataset[(String, Int)]) => 
input.dropDuplicates("_1").toDF()
+
+    testPartitionKeyExtraction(
+      StatefulOperatorsUtils.DEDUPLICATE_EXEC_OP_NAME,
+      inputData = Seq(("a", 1), ("b", 2), ("c", 3)),
+      dedupeDF = df,
+      // The key schema for dedup is just the _1 column (String)
+      keySchema = new StructType().add("_1", StringType),
+      // Empty value schema for dedup
+      valueSchema = new StructType(),
+      sqlConf = spark.sessionState.conf
+    )
+  }
+}
+
+trait StreamingDeduplicationSuiteBase { self: StreamTest =>
+  import testImplicits._
+
+  protected def testPartitionKeyExtraction(
+      operatorName: String,
+      inputData: Seq[(String, Int)],
+      dedupeDF: Dataset[(String, Int)] => DataFrame,
+      keySchema: StructType,
+      valueSchema: StructType,
+      sqlConf: SQLConf): Unit = {
+    withTempDir { checkpointDir =>
+      // 1 partition to make verification easier
+      val conf = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "1")
+
+      val inputStream = MemoryStream[(String, Int)]
+
+      // Run streaming query to populate state
+      testStream(dedupeDF(inputStream.toDS()), Append)(
+        StartStream(checkpointLocation = checkpointDir.getAbsolutePath, 
additionalConfs = conf),
+        AddData(inputStream, inputData: _*),
+        ProcessAllAvailable(),
+        StopStream
+      )
+
+      // Now access the state store to verify partition key extraction
+      val storeConf = new StateStoreConf(sqlConf)
+      val storeId = StateStoreId(checkpointDir.getAbsolutePath + "/state", 0, 
0)
+      val storeProviderId = StateStoreProviderId(storeId, UUID.randomUUID())
+
+      val keyProjection = UnsafeProjection.create(keySchema)
+      def createExpectedKeyRow(inputKey: String) = {
+        val row = new 
GenericInternalRow(Array[Any](UTF8String.fromString(inputKey)))
+        keyProjection.apply(row).copy()
+      }
+
+      val store = StateStore.getReadOnly(
+        storeProviderId,
+        keySchema,
+        valueSchema,
+        NoPrefixKeyStateEncoderSpec(keySchema),
+        version = 1,
+        stateStoreCkptId = None,
+        stateSchemaBroadcast = None,
+        useColumnFamilies = false,
+        storeConf,
+        new Configuration
+      )
+
+      try {
+        val extractor = StatePartitionKeyExtractorFactory.create(
+          operatorName,
+          keySchema
+        )
+
+        // Verify partition key schema matches the key schema
+        assert(extractor.partitionKeySchema === keySchema,
+          "Partition key schema should match the dedup key schema")
+
+        // Get all state keys written by the dedup query
+        val stateKeys = store.iterator().map(_.key.copy()).toList
+        assert(stateKeys.length === inputData.length,
+          s"Should have ${inputData.length} unique keys, found 
${stateKeys.length}")
+
+        // Extract partition keys
+        val partitionKeys = stateKeys.map(extractor.partitionKey(_).copy())
+        // Verify each partition key equals its corresponding state key (for 
dedup)
+        assert(partitionKeys === stateKeys,
+          "Partition keys should match state keys")
+
+        // Expected keys
+        inputData.foreach { case (key, _) =>
+          val keyRow = createExpectedKeyRow(key)
+          assert(partitionKeys.count(_ === keyRow) == 1, s"Should have 1 
partition key for $key")
+        }

Review Comment:
   yep, to make sure it is the expected key value



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to