HeartSaVioR commented on code in PR #43393:
URL: https://github.com/apache/spark/pull/43393#discussion_r1362969534


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala:
##########
@@ -145,14 +142,37 @@ class OperatorStateMetadataSuite extends StreamTest with 
SharedSparkSession {
       StopStream
     )
 
-    val statePath = new Path(checkpointDir.toString, "state/0")
-    val operatorMetadata = new OperatorStateMetadataReader(statePath, 
hadoopConf).read()
-      .asInstanceOf[OperatorStateMetadataV1]
-
     val expectedMetadata = OperatorStateMetadataV1(
       OperatorInfoV1(0, "sessionWindowStateStoreSaveExec"),
       Array(StateStoreMetadataV1("default", 1, 
spark.sessionState.conf.numShufflePartitions))
     )
-    assert(sameOperatorStateMetadata(operatorMetadata, expectedMetadata))
+    checkOperatorStateMetadata(0, expectedMetadata)
+  }
+
+  test("Stateful operator metadata for multiple operators.") {

Review Comment:
   nit: remove `.` at the end for consistency



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala:
##########
@@ -32,30 +32,37 @@ class OperatorStateMetadataSuite extends StreamTest with 
SharedSparkSession {
 
   private lazy val hadoopConf = spark.sessionState.newHadoopConf()
 
+  private var checkpointDir = Utils.createTempDir()

Review Comment:
   Our best practice is to use withTempDir per each test. Could we please 
follow the pattern and remove this and beforeEach?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala:
##########
@@ -32,30 +32,37 @@ class OperatorStateMetadataSuite extends StreamTest with 
SharedSparkSession {
 
   private lazy val hadoopConf = spark.sessionState.newHadoopConf()
 
+  private var checkpointDir = Utils.createTempDir()
+
   private def numShufflePartitions = 
spark.sessionState.conf.numShufflePartitions
 
-  private def sameOperatorStateMetadata(
-      operatorMetadata1: OperatorStateMetadataV1,
-      operatorMetadata2: OperatorStateMetadataV1): Boolean = {
-    operatorMetadata1.operatorInfo == operatorMetadata2.operatorInfo &&
-      
operatorMetadata1.stateStoreInfo.sameElements(operatorMetadata2.stateStoreInfo)
+  override def beforeEach(): Unit = {
+    Utils.deleteRecursively(checkpointDir)
+    checkpointDir = Utils.createTempDir()
+  }
+
+  private def checkOperatorStateMetadata(
+      operatorId: Int,
+      expectedMetadata: OperatorStateMetadataV1): Unit = {
+    val statePath = new Path(checkpointDir.toString, s"state/$operatorId")
+    val operatorMetadata = new OperatorStateMetadataReader(statePath, 
hadoopConf).read()
+      .asInstanceOf[OperatorStateMetadataV1]
+    // println("doodoo: " + operatorMetadata.stateStoreInfo)

Review Comment:
   nit: should be removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to