micheal-o commented on code in PR #54529:
URL: https://github.com/apache/spark/pull/54529#discussion_r2898433220
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala:
##########
@@ -2465,6 +2465,107 @@ abstract class StateStoreSuiteBase[ProviderClass <:
StateStoreProvider]
}
}
+ test("fileChecksumThreadPoolSize propagates to
ChecksumCheckpointFileManager") {
+ Seq(0, 1, 6).foreach { numThreads =>
+ val storeId = StateStoreId(newDir(), 0L, 0)
+ withSQLConf(
+ SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> "true",
+ SQLConf.STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE.key ->
numThreads.toString) {
+ tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) {
provider =>
+ val fmMethod = PrivateMethod[CheckpointFileManager](Symbol("fm"))
+ val fm = provider match {
+ case hdfs: HDFSBackedStateStoreProvider =>
+ hdfs.fm
+ case rocksdb: RocksDBStateStoreProvider =>
+ rocksdb.rocksDB.fileManager invokePrivate fmMethod()
+ case _ =>
+ fail(s"Unexpected provider type: ${provider.getClass.getName}")
+ }
+ assert(fm.isInstanceOf[ChecksumCheckpointFileManager])
+ assert(fm.asInstanceOf[ChecksumCheckpointFileManager].numThreads ===
numThreads)
+ }
+ }
+ }
+ }
+
+ test("STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE: invalid negative value is
rejected") {
+ val sqlConf = SQLConf.get.clone()
+ val ex = intercept[IllegalArgumentException] {
+
sqlConf.setConfString(SQLConf.STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE.key,
"-1")
+ }
+ assert(ex.getMessage.contains("Must be a non-negative integer"))
+ }
+
+ test("fileChecksumThreadPoolSize = 0 supports sequential I/O (load, write,
commit, reload)") {
+ val storeId = StateStoreId(newDir(), 0L, 0)
+ withSQLConf(
+ SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> "true",
+ SQLConf.STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE.key -> "0") {
+ // Write some state with sequential mode enabled
+ tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) {
provider =>
+ val store = provider.getStore(0)
+ put(store, "a", 0, 1)
+ put(store, "b", 0, 2)
+ store.commit()
+ provider.doMaintenance()
Review Comment:
maintenance isn't uploading anything. We are not producing snapshots to
upload. Same for all the other maintenance calls below
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/ChecksumCheckpointFileManager.scala:
##########
@@ -150,14 +152,30 @@ class ChecksumCheckpointFileManager(
val numThreads: Int,
val skipCreationIfFileMissingChecksum: Boolean)
extends CheckpointFileManager with Logging {
- assert(numThreads % 2 == 0, "numThreads must be a multiple of 2, we need 1
for the main file" +
- "and another for the checksum file")
+ assert(numThreads >= 0, "numThreads must be a non-negative integer")
+ if (numThreads < 4) {
+ logWarning(s"numThreads is set to $numThreads, which is below the
recommended minimum of 4 " +
Review Comment:
This log shouldn't be here. It's the caller e.g. state store, that knows how
many threads it will need
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]