HeartSaVioR commented on a change in pull request #29187: URL: https://github.com/apache/spark/pull/29187#discussion_r459128784
########## File path: core/src/main/scala/org/apache/spark/util/UninterruptibleThreadRunner.scala ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.util.concurrent.Executors + +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration.Duration + +private[spark] class UninterruptibleThreadRunner(threadName: String) { Review comment: Probably better to have classdoc as it's expected to be used from various callers; otherwise it can be simply in spark-sql-kafka module. ########## File path: core/src/main/scala/org/apache/spark/util/UninterruptibleThreadRunner.scala ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.util.concurrent.Executors + +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration.Duration + +private[spark] class UninterruptibleThreadRunner(threadName: String) { + private val thread = Executors.newSingleThreadExecutor((r: Runnable) => { + val t = new UninterruptibleThread(threadName) { + override def run(): Unit = { + r.run() + } + } + t.setDaemon(true) + t + }) + private val execContext = ExecutionContext.fromExecutorService(thread) + + def runUninterruptibly[T](body: => T): T = { Review comment: It'd be worth to migrate a part of origin comment here to construct a method doc. `This method ensures that the closure is called in an [[UninterruptibleThread]].` ########## File path: core/src/main/scala/org/apache/spark/util/UninterruptibleThreadRunner.scala ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.util.concurrent.Executors + +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration.Duration + +private[spark] class UninterruptibleThreadRunner(threadName: String) { + private val thread = Executors.newSingleThreadExecutor((r: Runnable) => { + val t = new UninterruptibleThread(threadName) { + override def run(): Unit = { + r.run() + } + } + t.setDaemon(true) + t + }) + private val execContext = ExecutionContext.fromExecutorService(thread) + + def runUninterruptibly[T](body: => T): T = { + if (!Thread.currentThread.isInstanceOf[UninterruptibleThread]) { + val future = Future { + body + }(execContext) + ThreadUtils.awaitResult(future, Duration.Inf) + } else { + body + } + } + + def close(): Unit = { Review comment: minor: the old version of Scala had ThreadRunner which had shutdown() - personally I see it clearer to represent behavior on what will happen, probably better name, but it may be only me. https://javadoc.io/doc/org.scala-lang/scala-library/2.10.3/scala/concurrent/ThreadRunner.html ########## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala ########## @@ -551,23 +537,6 @@ private[kafka010] class KafkaOffsetReader( } } - /** Review comment: I see the refactored class is created in core which is fine for me, but we'd better keep the origin comment here or in the comment of the field or in classdoc of KafkaOffsetReader, so that we still explain "why" we have to use UninterruptibleThread in KafkaOffsetReader. ########## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala ########## @@ -51,19 +48,8 @@ private[kafka010] class KafkaOffsetReader( val driverKafkaParams: ju.Map[String, Object], readerOptions: CaseInsensitiveMap[String], driverGroupIdPrefix: String) extends Logging { - /** - * Used to ensure execute fetch operations execute in an UninterruptibleThread - */ - val kafkaReaderThread = Executors.newSingleThreadExecutor((r: Runnable) => { - val t = new UninterruptibleThread("Kafka Offset Reader") { - override def run(): Unit = { - r.run() - } - } - t.setDaemon(true) - t - }) - val execContext = ExecutionContext.fromExecutorService(kafkaReaderThread) + + val uninterruptibleThreadRunner = new UninterruptibleThreadRunner("Kafka Offset Reader") Review comment: Let's only remove the comment when it's no longer valid or not useful. I think it provides some information for Kafka specific trick we use here. Probably we could consolidate two comments (here and below the one you also deleted) into one, and leave here. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
