Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/16947#discussion_r101862350
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
---
@@ -109,39 +108,12 @@ class HDFSMetadataLog[T <: AnyRef :
ClassTag](sparkSession: SparkSession, path:
override def add(batchId: Long, metadata: T): Boolean = {
get(batchId).map(_ => false).getOrElse {
// Only write metadata when the batch has not yet been written
- if (fileManager.isLocalFileSystem) {
- Thread.currentThread match {
- case ut: UninterruptibleThread =>
- // When using a local file system, "writeBatch" must be called
on a
- // [[org.apache.spark.util.UninterruptibleThread]] so that
interrupts can be disabled
- // while writing the batch file.
- //
- // This is because Hadoop "Shell.runCommand" swallows
InterruptException (HADOOP-14084).
- // If the user tries to stop a query, and the thread running
"Shell.runCommand" is
- // interrupted, then InterruptException will be dropped and
the query will be still
- // running. (Note: `writeBatch` creates a file using HDFS APIs
and will call
- // "Shell.runCommand" to set the file permission if using the
local file system)
- //
- // Hence, we make sure that "writeBatch" is called on
[[UninterruptibleThread]] which
- // allows us to disable interrupts here, in order to propagate
the interrupt state
- // correctly. Also see SPARK-19599.
- ut.runUninterruptibly { writeBatch(batchId, metadata) }
- case _ =>
- throw new IllegalStateException(
- "HDFSMetadataLog.add() on a local file system must be
executed on " +
- "a o.a.spark.util.UninterruptibleThread")
- }
- } else {
- // For a distributed file system, such as HDFS or S3, if the
network is broken, write
- // operations may just hang until timeout. We should enable
interrupts to allow stopping
- // the query fast.
- writeBatch(batchId, metadata)
- }
+ writeBatch(batchId, metadata)
--- End diff --
Didnt we disable interrupt because with local files, hadoop used shell
commands to do file manipulation which could hang when interrupted? Are we
removing this now because that has been fixed in hadoop?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]