gnanda commented on code in PR #54529:
URL: https://github.com/apache/spark/pull/54529#discussion_r2881389818
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/ChecksumCheckpointFileManager.scala:
##########
@@ -150,14 +153,33 @@ class ChecksumCheckpointFileManager(
val numThreads: Int,
val skipCreationIfFileMissingChecksum: Boolean)
extends CheckpointFileManager with Logging {
- assert(numThreads % 2 == 0, "numThreads must be a multiple of 2, we need 1
for the main file" +
- "and another for the checksum file")
+ assert(numThreads >= 0, "numThreads must be a non-negative integer")
import ChecksumCheckpointFileManager._
- // This allows us to concurrently read/write the main file and checksum file
- private val threadPool = ExecutionContext.fromExecutorService(
- ThreadUtils.newDaemonFixedThreadPool(numThreads,
s"${this.getClass.getSimpleName}-Thread"))
+ // Thread pool for concurrent execution, or None when numThreads == 0
(sequential/inline mode).
+ private val threadPool: Option[ExecutionContextExecutorService] =
+ if (numThreads == 0) None
+ else Some(ExecutionContext.fromExecutorService(
+ ThreadUtils.newDaemonFixedThreadPool(numThreads,
s"${this.getClass.getSimpleName}-Thread")))
+
+ // ExecutionContext to pass to stream classes: uses the thread pool or a
caller-runs context.
+ private val streamContext: ExecutionContext = threadPool.getOrElse(new
ExecutionContext {
Review Comment:
changed to `executionContext` (AI chose that original name; wasn't sure if
it was a scala thing)
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/ChecksumCheckpointFileManager.scala:
##########
@@ -150,14 +153,33 @@ class ChecksumCheckpointFileManager(
val numThreads: Int,
val skipCreationIfFileMissingChecksum: Boolean)
extends CheckpointFileManager with Logging {
- assert(numThreads % 2 == 0, "numThreads must be a multiple of 2, we need 1
for the main file" +
- "and another for the checksum file")
+ assert(numThreads >= 0, "numThreads must be a non-negative integer")
import ChecksumCheckpointFileManager._
- // This allows us to concurrently read/write the main file and checksum file
- private val threadPool = ExecutionContext.fromExecutorService(
- ThreadUtils.newDaemonFixedThreadPool(numThreads,
s"${this.getClass.getSimpleName}-Thread"))
+ // Thread pool for concurrent execution, or None when numThreads == 0
(sequential/inline mode).
+ private val threadPool: Option[ExecutionContextExecutorService] =
+ if (numThreads == 0) None
+ else Some(ExecutionContext.fromExecutorService(
+ ThreadUtils.newDaemonFixedThreadPool(numThreads,
s"${this.getClass.getSimpleName}-Thread")))
+
+ // ExecutionContext to pass to stream classes: uses the thread pool or a
caller-runs context.
+ private val streamContext: ExecutionContext = threadPool.getOrElse(new
ExecutionContext {
+ override def execute(runnable: Runnable): Unit = runnable.run()
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]