anishshri-db commented on code in PR #53747:
URL: https://github.com/apache/spark/pull/53747#discussion_r2677514140


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -157,7 +162,7 @@ class StateDataSource extends TableProvider with 
DataSourceRegister with Logging
 
         val clonedSqlConf = session.sessionState.conf.clone()
         OffsetSeqMetadata.setSessionConf(metadata, clonedSqlConf)
-        StateStoreConf(clonedSqlConf)
+        (StateStoreConf(clonedSqlConf), clonedSqlConf)

Review Comment:
   Can we not extract SQLConf from the StateStoreConf ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -147,7 +151,8 @@ class StateDataSource extends TableProvider with 
DataSourceRegister with Logging
         sourceOptions.operatorId)
   }
 
-  private def buildStateStoreConf(checkpointLocation: String, batchId: Long): 
StateStoreConf = {
+  private def buildConfsForBatch(
+      checkpointLocation: String, batchId: Long): (StateStoreConf, SQLConf) = {

Review Comment:
   nit: move both args to new line ?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OfflineStateRepartitionSuite.scala:
##########
@@ -298,7 +349,108 @@ class OfflineStateRepartitionSuite extends StreamTest {
           getShufflePartitions(lastMetadata).get != 
getShufflePartitions(previousMetadata).get,
           "Shuffle partitions should be different between batches")
       case _ =>
-        fail("Both batches should have metadata")
+        assert(false, "Both batches should have metadata")
+    }
+  }
+
+  // verify number of partition dirs in state dir
+  private def verifyPartitionDirs(
+      checkpointLocation: String, expectedShufflePartitions: Int): Unit = {

Review Comment:
   move args to new line ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OfflineStateRepartitionRunner.scala:
##########
@@ -229,6 +246,48 @@ class OfflineStateRepartitionRunner(
     newBatchId
   }
 
+  private def updateNumPartitionsInOperatorMetadata(
+      newBatchId: Long, readBatchId: Long): Unit = {

Review Comment:
   nit: same here ?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OfflineStateRepartitionSuite.scala:
##########
@@ -298,7 +349,108 @@ class OfflineStateRepartitionSuite extends StreamTest {
           getShufflePartitions(lastMetadata).get != 
getShufflePartitions(previousMetadata).get,
           "Shuffle partitions should be different between batches")
       case _ =>
-        fail("Both batches should have metadata")
+        assert(false, "Both batches should have metadata")
+    }
+  }
+
+  // verify number of partition dirs in state dir
+  private def verifyPartitionDirs(
+      checkpointLocation: String, expectedShufflePartitions: Int): Unit = {
+    val stateDir = new java.io.File(checkpointLocation, "state")
+
+    def numDir(file: java.io.File): Int = {

Review Comment:
   nit: `numDirs` ?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OfflineStateRepartitionSuite.scala:
##########
@@ -198,15 +219,18 @@ class OfflineStateRepartitionSuite extends StreamTest {
   test("Consecutive repartition") {

Review Comment:
   Are we covering all cases now ?
   - repartition to lower than before
   - repartition to higher than before
   - same as before - which should be a no-op
   - consecutive repartitions 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to