gnanda commented on code in PR #54529:
URL: https://github.com/apache/spark/pull/54529#discussion_r2881392739
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/ChecksumCheckpointFileManager.scala:
##########
@@ -150,14 +153,33 @@ class ChecksumCheckpointFileManager(
val numThreads: Int,
val skipCreationIfFileMissingChecksum: Boolean)
extends CheckpointFileManager with Logging {
- assert(numThreads % 2 == 0, "numThreads must be a multiple of 2, we need 1
for the main file" +
- "and another for the checksum file")
+ assert(numThreads >= 0, "numThreads must be a non-negative integer")
import ChecksumCheckpointFileManager._
- // This allows us to concurrently read/write the main file and checksum file
- private val threadPool = ExecutionContext.fromExecutorService(
- ThreadUtils.newDaemonFixedThreadPool(numThreads,
s"${this.getClass.getSimpleName}-Thread"))
+ // Thread pool for concurrent execution, or None when numThreads == 0
(sequential/inline mode).
+ private val threadPool: Option[ExecutionContextExecutorService] =
+ if (numThreads == 0) None
+ else Some(ExecutionContext.fromExecutorService(
+ ThreadUtils.newDaemonFixedThreadPool(numThreads,
s"${this.getClass.getSimpleName}-Thread")))
+
+ // ExecutionContext to pass to stream classes: uses the thread pool or a
caller-runs context.
+ private val streamContext: ExecutionContext = threadPool.getOrElse(new
ExecutionContext {
+ override def execute(runnable: Runnable): Unit = runnable.run()
+ override def reportFailure(cause: Throwable): Unit = throw cause
+ })
+
+ /**
+ * Schedules a computation on the thread pool, or runs it directly on the
calling thread
+ * if numThreads == 0 (sequential mode).
+ */
+ private def scheduleOrRun[T](f: => T): Future[T] = threadPool match {
+ case None =>
+ try Future.successful(f)
Review Comment:
No longer applicable
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]