micheal-o commented on code in PR #54529:
URL: https://github.com/apache/spark/pull/54529#discussion_r2875610836
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -3695,6 +3684,18 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE =
+ buildConf("spark.sql.streaming.stateStore.fileChecksumThreadPoolSize")
+ .internal()
+ .doc("Number of threads used to compute file checksums concurrently when
uploading " +
+ "state store checkpoints (e.g. main file and checksum file). " +
Review Comment:
nit: Number of threads used to read/write files and their corresponding
checksum files concurrently
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/ChecksumCheckpointFileManager.scala:
##########
@@ -150,14 +153,33 @@ class ChecksumCheckpointFileManager(
val numThreads: Int,
val skipCreationIfFileMissingChecksum: Boolean)
extends CheckpointFileManager with Logging {
- assert(numThreads % 2 == 0, "numThreads must be a multiple of 2, we need 1
for the main file" +
- "and another for the checksum file")
+ assert(numThreads >= 0, "numThreads must be a non-negative integer")
import ChecksumCheckpointFileManager._
- // This allows us to concurrently read/write the main file and checksum file
- private val threadPool = ExecutionContext.fromExecutorService(
- ThreadUtils.newDaemonFixedThreadPool(numThreads,
s"${this.getClass.getSimpleName}-Thread"))
+ // Thread pool for concurrent execution, or None when numThreads == 0
(sequential/inline mode).
+ private val threadPool: Option[ExecutionContextExecutorService] =
Review Comment:
nit: rename to `threadPoolOpt` to make it obvious
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/ChecksumCheckpointFileManager.scala:
##########
@@ -150,14 +153,33 @@ class ChecksumCheckpointFileManager(
val numThreads: Int,
val skipCreationIfFileMissingChecksum: Boolean)
extends CheckpointFileManager with Logging {
- assert(numThreads % 2 == 0, "numThreads must be a multiple of 2, we need 1
for the main file" +
- "and another for the checksum file")
+ assert(numThreads >= 0, "numThreads must be a non-negative integer")
import ChecksumCheckpointFileManager._
- // This allows us to concurrently read/write the main file and checksum file
- private val threadPool = ExecutionContext.fromExecutorService(
- ThreadUtils.newDaemonFixedThreadPool(numThreads,
s"${this.getClass.getSimpleName}-Thread"))
+ // Thread pool for concurrent execution, or None when numThreads == 0
(sequential/inline mode).
Review Comment:
please lets also retain the original comment here
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/ChecksumCheckpointFileManager.scala:
##########
@@ -150,14 +153,33 @@ class ChecksumCheckpointFileManager(
val numThreads: Int,
val skipCreationIfFileMissingChecksum: Boolean)
extends CheckpointFileManager with Logging {
- assert(numThreads % 2 == 0, "numThreads must be a multiple of 2, we need 1
for the main file" +
- "and another for the checksum file")
+ assert(numThreads >= 0, "numThreads must be a non-negative integer")
import ChecksumCheckpointFileManager._
- // This allows us to concurrently read/write the main file and checksum file
- private val threadPool = ExecutionContext.fromExecutorService(
- ThreadUtils.newDaemonFixedThreadPool(numThreads,
s"${this.getClass.getSimpleName}-Thread"))
+ // Thread pool for concurrent execution, or None when numThreads == 0
(sequential/inline mode).
+ private val threadPool: Option[ExecutionContextExecutorService] =
+ if (numThreads == 0) None
+ else Some(ExecutionContext.fromExecutorService(
+ ThreadUtils.newDaemonFixedThreadPool(numThreads,
s"${this.getClass.getSimpleName}-Thread")))
+
+ // ExecutionContext to pass to stream classes: uses the thread pool or a
caller-runs context.
+ private val streamContext: ExecutionContext = threadPool.getOrElse(new
ExecutionContext {
+ override def execute(runnable: Runnable): Unit = runnable.run()
Review Comment:
nit: comment: `This will execute the runnable synchronously`
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/ChecksumCheckpointFileManager.scala:
##########
@@ -150,14 +153,33 @@ class ChecksumCheckpointFileManager(
val numThreads: Int,
val skipCreationIfFileMissingChecksum: Boolean)
extends CheckpointFileManager with Logging {
- assert(numThreads % 2 == 0, "numThreads must be a multiple of 2, we need 1
for the main file" +
- "and another for the checksum file")
+ assert(numThreads >= 0, "numThreads must be a non-negative integer")
import ChecksumCheckpointFileManager._
- // This allows us to concurrently read/write the main file and checksum file
- private val threadPool = ExecutionContext.fromExecutorService(
- ThreadUtils.newDaemonFixedThreadPool(numThreads,
s"${this.getClass.getSimpleName}-Thread"))
+ // Thread pool for concurrent execution, or None when numThreads == 0
(sequential/inline mode).
+ private val threadPool: Option[ExecutionContextExecutorService] =
+ if (numThreads == 0) None
+ else Some(ExecutionContext.fromExecutorService(
+ ThreadUtils.newDaemonFixedThreadPool(numThreads,
s"${this.getClass.getSimpleName}-Thread")))
+
+ // ExecutionContext to pass to stream classes: uses the thread pool or a
caller-runs context.
Review Comment:
nit: you mean input/outStream? Lets make it clear
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/ChecksumCheckpointFileManager.scala:
##########
@@ -127,12 +128,14 @@ case class ChecksumFile(path: Path) {
* orphan checksum files. If using this, it is
your responsibility
* to clean up the potential orphan checksum
files.
* @param numThreads This is the number of threads to use for the thread pool,
for reading/writing
- * files. To avoid blocking, if the file manager instance is
being used by a
- * single thread, then you can set this to 2 (one thread for
main file, another
- * for checksum file).
- * If file manager is shared by multiple threads, you can
set it to
- * number of threads using file manager * 2.
- * Setting this differently can lead to file operation being
blocked waiting for
+ * files. Must be a non-negative integer.
+ * Setting this to 0 disables the thread pool and runs all
operations
+ * sequentially on the calling thread (no concurrency).
+ * To avoid blocking with a single concurrent caller, set
this to 2 (one thread
Review Comment:
> To avoid blocking
For this remaining statements, lets use the original statement. This new one
is not very clear.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/ChecksumCheckpointFileManager.scala:
##########
@@ -150,14 +153,33 @@ class ChecksumCheckpointFileManager(
val numThreads: Int,
val skipCreationIfFileMissingChecksum: Boolean)
extends CheckpointFileManager with Logging {
- assert(numThreads % 2 == 0, "numThreads must be a multiple of 2, we need 1
for the main file" +
- "and another for the checksum file")
+ assert(numThreads >= 0, "numThreads must be a non-negative integer")
import ChecksumCheckpointFileManager._
- // This allows us to concurrently read/write the main file and checksum file
- private val threadPool = ExecutionContext.fromExecutorService(
- ThreadUtils.newDaemonFixedThreadPool(numThreads,
s"${this.getClass.getSimpleName}-Thread"))
+ // Thread pool for concurrent execution, or None when numThreads == 0
(sequential/inline mode).
+ private val threadPool: Option[ExecutionContextExecutorService] =
+ if (numThreads == 0) None
+ else Some(ExecutionContext.fromExecutorService(
+ ThreadUtils.newDaemonFixedThreadPool(numThreads,
s"${this.getClass.getSimpleName}-Thread")))
+
+ // ExecutionContext to pass to stream classes: uses the thread pool or a
caller-runs context.
+ private val streamContext: ExecutionContext = threadPool.getOrElse(new
ExecutionContext {
Review Comment:
why is this named `streamContext`?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/ChecksumCheckpointFileManager.scala:
##########
@@ -150,14 +153,33 @@ class ChecksumCheckpointFileManager(
val numThreads: Int,
val skipCreationIfFileMissingChecksum: Boolean)
extends CheckpointFileManager with Logging {
- assert(numThreads % 2 == 0, "numThreads must be a multiple of 2, we need 1
for the main file" +
- "and another for the checksum file")
+ assert(numThreads >= 0, "numThreads must be a non-negative integer")
import ChecksumCheckpointFileManager._
- // This allows us to concurrently read/write the main file and checksum file
- private val threadPool = ExecutionContext.fromExecutorService(
- ThreadUtils.newDaemonFixedThreadPool(numThreads,
s"${this.getClass.getSimpleName}-Thread"))
+ // Thread pool for concurrent execution, or None when numThreads == 0
(sequential/inline mode).
+ private val threadPool: Option[ExecutionContextExecutorService] =
+ if (numThreads == 0) None
+ else Some(ExecutionContext.fromExecutorService(
+ ThreadUtils.newDaemonFixedThreadPool(numThreads,
s"${this.getClass.getSimpleName}-Thread")))
+
+ // ExecutionContext to pass to stream classes: uses the thread pool or a
caller-runs context.
+ private val streamContext: ExecutionContext = threadPool.getOrElse(new
ExecutionContext {
+ override def execute(runnable: Runnable): Unit = runnable.run()
+ override def reportFailure(cause: Throwable): Unit = throw cause
+ })
+
+ /**
+ * Schedules a computation on the thread pool, or runs it directly on the
calling thread
+ * if numThreads == 0 (sequential mode).
+ */
+ private def scheduleOrRun[T](f: => T): Future[T] = threadPool match {
+ case None =>
+ try Future.successful(f)
Review Comment:
nit: `try {}`
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/ChecksumCheckpointFileManager.scala:
##########
@@ -150,14 +153,33 @@ class ChecksumCheckpointFileManager(
val numThreads: Int,
val skipCreationIfFileMissingChecksum: Boolean)
extends CheckpointFileManager with Logging {
- assert(numThreads % 2 == 0, "numThreads must be a multiple of 2, we need 1
for the main file" +
- "and another for the checksum file")
+ assert(numThreads >= 0, "numThreads must be a non-negative integer")
import ChecksumCheckpointFileManager._
- // This allows us to concurrently read/write the main file and checksum file
- private val threadPool = ExecutionContext.fromExecutorService(
- ThreadUtils.newDaemonFixedThreadPool(numThreads,
s"${this.getClass.getSimpleName}-Thread"))
+ // Thread pool for concurrent execution, or None when numThreads == 0
(sequential/inline mode).
+ private val threadPool: Option[ExecutionContextExecutorService] =
+ if (numThreads == 0) None
+ else Some(ExecutionContext.fromExecutorService(
+ ThreadUtils.newDaemonFixedThreadPool(numThreads,
s"${this.getClass.getSimpleName}-Thread")))
+
+ // ExecutionContext to pass to stream classes: uses the thread pool or a
caller-runs context.
+ private val streamContext: ExecutionContext = threadPool.getOrElse(new
ExecutionContext {
+ override def execute(runnable: Runnable): Unit = runnable.run()
+ override def reportFailure(cause: Throwable): Unit = throw cause
+ })
+
+ /**
+ * Schedules a computation on the thread pool, or runs it directly on the
calling thread
+ * if numThreads == 0 (sequential mode).
+ */
+ private def scheduleOrRun[T](f: => T): Future[T] = threadPool match {
Review Comment:
Why have `scheduleOrRun` and the synchronous Exec Context above? Why not
only use the sync Exec context. Not sure I get why we need both
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala:
##########
@@ -1547,6 +1547,35 @@ class StateStoreSuite extends
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
}
}
+ test("fileChecksumThreadPoolSize propagates to
ChecksumCheckpointFileManager") {
+ Seq(0, 1, 6).foreach { numThreads =>
+ withTempDir { dir =>
+ val sqlConf = SQLConf.get.clone()
+
sqlConf.setConfString(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key,
"true")
+ sqlConf.setConfString(
+ SQLConf.STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE.key,
numThreads.toString)
+
+ val provider = newStoreProvider(
+ opId = Random.nextInt(), partition = 0, dir = dir.getCanonicalPath,
+ sqlConfOpt = Some(sqlConf))
+ val fileManagerMethod =
PrivateMethod[CheckpointFileManager](Symbol("fm"))
+ val fm = provider invokePrivate fileManagerMethod()
+
+ assert(fm.isInstanceOf[ChecksumCheckpointFileManager])
+ assert(fm.asInstanceOf[ChecksumCheckpointFileManager].numThreads ===
numThreads)
+ provider.close()
+ }
+ }
+ }
Review Comment:
Lets also add test case with threadpool disabled to make sure using the
state store works i.e. load, write, get, commit. Reload.. etc. You can also do
concurrent store.commit and maintenance, to make sure they both finish
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ChecksumCheckpointFileManagerSuite.scala:
##########
@@ -250,6 +250,30 @@ abstract class ChecksumCheckpointFileManagerSuite extends
CheckpointFileManagerT
checksumFmWithoutFallback.close()
}
}
+
+ test("numThreads = 0 disables thread pool") {
+ withTempHadoopPath { basePath =>
+ val fm = new ChecksumCheckpointFileManager(
+ createNoChecksumManager(basePath),
+ allowConcurrentDelete = true,
+ numThreads = 0,
+ skipCreationIfFileMissingChecksum = false)
+ fm.close()
Review Comment:
This isn't testing much. We should write and read a file to confirm that
main and checksum files are written and read
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##########
@@ -533,11 +533,10 @@ private[sql] class HDFSBackedStateStoreProvider extends
StateStoreProvider with
mgr,
// Allowing this for perf, since we do orphan checksum file cleanup in
maintenance anyway
allowConcurrentDelete = true,
- // We need 2 threads per fm caller to avoid blocking
- // (one for main file and another for checksum file).
- // Since this fm is used by both query task and maintenance thread,
- // then we need 2 * 2 = 4 threads.
- numThreads = 4,
+ // To avoid blocking, we need 2 threads per fm caller (one for main
file, one for checksum
+ // file). Since this fm is used by both query task and maintenance
thread, the recommended
+ // default is 2 * 2 = 4 threads. A value of 0 disables the thread pool
(sequential mode).
+ numThreads = storeConf.fileChecksumThreadPoolSize,
Review Comment:
lets also do the logging for this too right
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala:
##########
@@ -1547,6 +1547,35 @@ class StateStoreSuite extends
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
}
}
+ test("fileChecksumThreadPoolSize propagates to
ChecksumCheckpointFileManager") {
Review Comment:
The test cases are currently under StateStoreSuite, if you look at the class
def, it is for HDFS store only. This should be under `StateStoreSuiteBase` so
that is runs for rocksdb too
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]