Repository: spark
Updated Branches:
  refs/heads/master 275415777 -> a65736996


[SPARK-14540][CORE] Fix remaining major issues for Scala 2.12 Support

## What changes were proposed in this pull request?
This PR addresses issues 2,3 in this 
[document](https://docs.google.com/document/d/1fbkjEL878witxVQpOCbjlvOvadHtVjYXeB-2mgzDTvk).

* We modified the closure cleaner to identify closures that are implemented via 
the LambdaMetaFactory mechanism (serializedLambdas) (issue2).

* We also fix the issue due to scala/bug#11016. There are two options for 
solving the Unit issue, either add () at the end of the closure or use the 
trick described in the doc. Otherwise overloading resolution does not work (we 
are not going to eliminate either of the methods) here. Compiler tries to adapt 
to Unit and makes these two methods candidates for overloading, when there is 
polymorphic overloading there is no ambiguity (that is the workaround 
implemented). This does not look that good but it serves its purpose as we need 
to support two different uses for method: `addTaskCompletionListener`. One that 
passes a TaskCompletionListener and one that passes a closure that is wrapped 
with a TaskCompletionListener later on (issue3).

Note: regarding issue 1 in the doc the plan is:

> Do Nothing. Don’t try to fix this as this is only a problem for Java users 
> who would want to use 2.11 binaries. In that case they can cast to 
> MapFunction to be able to utilize lambdas. In Spark 3.0.0 the API should be 
> simplified so that this issue is removed.

## How was this patch tested?
This was manually tested:
```./dev/change-scala-version.sh 2.12
./build/mvn -DskipTests -Pscala-2.12 clean package
./build/mvn -Pscala-2.12 clean package 
-DwildcardSuites=org.apache.spark.serializer.ProactiveClosureSerializationSuite 
-Dtest=None
./build/mvn -Pscala-2.12 clean package 
-DwildcardSuites=org.apache.spark.util.ClosureCleanerSuite -Dtest=None
./build/mvn -Pscala-2.12 clean package 
-DwildcardSuites=org.apache.spark.streaming.DStreamClosureSuite -Dtest=None```

Author: Stavros Kontopoulos <stavros.kontopou...@lightbend.com>

Closes #21930 from skonto/scala2.12-sup.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6573699
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6573699
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6573699

Branch: refs/heads/master
Commit: a65736996b2b506f61cc8d599ec9f4c52a1b5312
Parents: 2754157
Author: Stavros Kontopoulos <stavros.kontopou...@lightbend.com>
Authored: Thu Aug 2 09:17:09 2018 -0500
Committer: Sean Owen <sro...@gmail.com>
Committed: Thu Aug 2 09:17:09 2018 -0500

----------------------------------------------------------------------
 .../scala/org/apache/spark/TaskContext.scala    |   5 +-
 .../apache/spark/api/python/PythonRunner.scala  |   2 +-
 .../spark/broadcast/TorrentBroadcast.scala      |   2 +-
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  |   2 +-
 .../scala/org/apache/spark/rdd/JdbcRDD.scala    |   2 +-
 .../org/apache/spark/rdd/NewHadoopRDD.scala     |   2 +-
 .../spark/rdd/ReliableCheckpointRDD.scala       |   2 +-
 .../spark/shuffle/BlockStoreShuffleReader.scala |   2 +-
 .../storage/ShuffleBlockFetcherIterator.scala   |   2 +-
 .../spark/storage/memory/MemoryStore.scala      |   2 +-
 .../org/apache/spark/util/ClosureCleaner.scala  | 278 ++++++++++++-------
 .../util/collection/ExternalAppendOnlyMap.scala |   2 +-
 .../apache/spark/util/ClosureCleanerSuite.scala |   3 +
 .../spark/util/ClosureCleanerSuite2.scala       |  53 +++-
 .../apache/spark/sql/avro/AvroFileFormat.scala  |   2 +-
 .../spark/sql/kafka010/KafkaSourceRDD.scala     |   2 +-
 .../spark/streaming/kafka010/KafkaRDD.scala     |   2 +-
 .../spark/ml/source/libsvm/LibSVMRelation.scala |   2 +-
 .../aggregate/TungstenAggregationIterator.scala |   2 +-
 .../sql/execution/arrow/ArrowConverters.scala   |   4 +-
 .../columnar/InMemoryTableScanExec.scala        |   2 +-
 .../execution/datasources/CodecStreams.scala    |   2 +-
 .../sql/execution/datasources/FileScanRDD.scala |   2 +-
 .../datasources/csv/CSVDataSource.scala         |   2 +-
 .../execution/datasources/jdbc/JDBCRDD.scala    |   2 +-
 .../datasources/json/JsonDataSource.scala       |   2 +-
 .../datasources/orc/OrcFileFormat.scala         |   4 +-
 .../datasources/parquet/ParquetFileFormat.scala |   4 +-
 .../datasources/text/TextFileFormat.scala       |   2 +-
 .../datasources/v2/DataSourceRDD.scala          |   2 +-
 .../spark/sql/execution/joins/HashJoin.scala    |   2 +-
 .../execution/joins/ShuffledHashJoinExec.scala  |   2 +-
 .../python/AggregateInPandasExec.scala          |   2 +-
 .../execution/python/ArrowPythonRunner.scala    |   2 +-
 .../sql/execution/python/EvalPythonExec.scala   |   2 +-
 .../execution/python/PythonForeachWriter.scala  |   2 +-
 .../execution/python/WindowInPandasExec.scala   |   2 +-
 .../continuous/ContinuousCoalesceRDD.scala      |   5 +-
 .../continuous/ContinuousQueuedDataReader.scala |   2 +-
 .../shuffle/ContinuousShuffleReadRDD.scala      |   2 +-
 .../state/SymmetricHashJoinStateManager.scala   |   2 +-
 .../sql/execution/streaming/state/package.scala |   2 +-
 .../spark/sql/hive/orc/OrcFileFormat.scala      |   3 +-
 43 files changed, 266 insertions(+), 161 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/core/src/main/scala/org/apache/spark/TaskContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala 
b/core/src/main/scala/org/apache/spark/TaskContext.scala
index 6973974..ceadf10 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -123,7 +123,10 @@ abstract class TaskContext extends Serializable {
    *
    * Exceptions thrown by the listener will result in failure of the task.
    */
-  def addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext = {
+  def addTaskCompletionListener[U](f: (TaskContext) => U): TaskContext = {
+    // Note that due to this scala bug: 
https://github.com/scala/bug/issues/11016, we need to make
+    // this function polymorphic for every scala version >= 2.12, otherwise an 
overloaded method
+    // resolution error occurs at compile time.
     addTaskCompletionListener(new TaskCompletionListener {
       override def onTaskCompletion(context: TaskContext): Unit = f(context)
     })

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index ebabedf..7b31857 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -94,7 +94,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
     // Start a thread to feed the process input from our parent's iterator
     val writerThread = newWriterThread(env, worker, inputIterator, 
partitionIndex, context)
 
-    context.addTaskCompletionListener { _ =>
+    context.addTaskCompletionListener[Unit] { _ =>
       writerThread.shutdownOnTaskCompletion()
       if (!reuseWorker || !released.get) {
         try {

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala 
b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index e125095..cbd49e0 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -262,7 +262,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, 
id: Long)
     val blockManager = SparkEnv.get.blockManager
     Option(TaskContext.get()) match {
       case Some(taskContext) =>
-        taskContext.addTaskCompletionListener(_ => 
blockManager.releaseLock(blockId))
+        taskContext.addTaskCompletionListener[Unit](_ => 
blockManager.releaseLock(blockId))
       case None =>
         // This should only happen on the driver, where broadcast variables 
may be accessed
         // outside of running tasks (e.g. when computing rdd.partitions()). In 
order to allow

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 44895ab..3974580 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -278,7 +278,7 @@ class HadoopRDD[K, V](
             null
         }
       // Register an on-task-completion callback to close the input stream.
-      context.addTaskCompletionListener { context =>
+      context.addTaskCompletionListener[Unit] { context =>
         // Update the bytes read before closing is to make sure lingering 
bytesRead statistics in
         // this thread get correctly added.
         updateBytesRead()

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
index aab46b8..56ef3e1 100644
--- a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
@@ -77,7 +77,7 @@ class JdbcRDD[T: ClassTag](
 
   override def compute(thePart: Partition, context: TaskContext): Iterator[T] 
= new NextIterator[T]
   {
-    context.addTaskCompletionListener{ context => closeIfNeeded() }
+    context.addTaskCompletionListener[Unit]{ context => closeIfNeeded() }
     val part = thePart.asInstanceOf[JdbcPartition]
     val conn = getConnection()
     val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, 
ResultSet.CONCUR_READ_ONLY)

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index ff66a04..2d66d25 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -214,7 +214,7 @@ class NewHadoopRDD[K, V](
         }
 
       // Register an on-task-completion callback to close the input stream.
-      context.addTaskCompletionListener { context =>
+      context.addTaskCompletionListener[Unit] { context =>
         // Update the bytesRead before closing is to make sure lingering 
bytesRead statistics in
         // this thread get correctly added.
         updateBytesRead()

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
index 979152b..8273d8a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
@@ -300,7 +300,7 @@ private[spark] object ReliableCheckpointRDD extends Logging 
{
     val deserializeStream = serializer.deserializeStream(fileInputStream)
 
     // Register an on-task-completion callback to close the input stream.
-    context.addTaskCompletionListener(context => deserializeStream.close())
+    context.addTaskCompletionListener[Unit](context => 
deserializeStream.close())
 
     deserializeStream.asIterator.asInstanceOf[Iterator[T]]
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala 
b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
index 4103dfb..74b0e0b 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
@@ -104,7 +104,7 @@ private[spark] class BlockStoreShuffleReader[K, C](
         context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
         
context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
         // Use completion callback to stop sorter if task was 
finished/cancelled.
-        context.addTaskCompletionListener(_ => {
+        context.addTaskCompletionListener[Unit](_ => {
           sorter.stop()
         })
         CompletionIterator[Product2[K, C], Iterator[Product2[K, 
C]]](sorter.iterator, sorter.stop())

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index b318623..00d01dd 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -346,7 +346,7 @@ final class ShuffleBlockFetcherIterator(
 
   private[this] def initialize(): Unit = {
     // Add a task completion callback (called in both success case and failure 
case) to cleanup.
-    context.addTaskCompletionListener(_ => cleanup())
+    context.addTaskCompletionListener[Unit](_ => cleanup())
 
     // Split local and remote blocks.
     val remoteRequests = splitLocalRemoteBlocks()

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala 
b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index 4cc5bcb..06fd56e 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -827,7 +827,7 @@ private[storage] class PartiallySerializedBlock[T](
   // completion listener here in order to ensure that `unrolled.dispose()` is 
called at least once.
   // The dispose() method is idempotent, so it's safe to call it 
unconditionally.
   Option(TaskContext.get()).foreach { taskContext =>
-    taskContext.addTaskCompletionListener { _ =>
+    taskContext.addTaskCompletionListener[Unit] { _ =>
       // When a task completes, its unroll memory will automatically be freed. 
Thus we do not call
       // releaseUnrollMemoryForThisTask() here because we want to avoid 
double-freeing.
       unrolledBuffer.dispose()

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala 
b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index 073d71c..d8c840c 100644
--- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.util
 
 import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+import java.lang.invoke.SerializedLambda
 
 import scala.collection.mutable.{Map, Set, Stack}
 import scala.language.existentials
@@ -33,6 +34,8 @@ import org.apache.spark.internal.Logging
  */
 private[spark] object ClosureCleaner extends Logging {
 
+  private val isScala2_11 = 
scala.util.Properties.versionString.contains("2.11")
+
   // Get an ASM class reader for a given class from the JAR that loaded it
   private[util] def getClassReader(cls: Class[_]): ClassReader = {
     // Copy data over, before delegating to ClassReader - else we can run out 
of open file handles.
@@ -160,6 +163,42 @@ private[spark] object ClosureCleaner extends Logging {
   }
 
   /**
+   * Try to get a serialized Lambda from the closure.
+   *
+   * @param closure the closure to check.
+   */
+  private def getSerializedLambda(closure: AnyRef): Option[SerializedLambda] = 
{
+    if (isScala2_11) {
+      return None
+    }
+    val isClosureCandidate =
+      closure.getClass.isSynthetic &&
+        closure
+          .getClass
+          .getInterfaces.exists(_.getName.equals("scala.Serializable"))
+
+    if (isClosureCandidate) {
+      try {
+        Option(inspect(closure))
+      } catch {
+        case e: Exception =>
+          // no need to check if debug is enabled here the Spark
+          // logging api covers this.
+          logDebug("Closure is not a serialized lambda.", e)
+          None
+      }
+    } else {
+      None
+    }
+  }
+
+  private def inspect(closure: AnyRef): SerializedLambda = {
+    val writeReplace = closure.getClass.getDeclaredMethod("writeReplace")
+    writeReplace.setAccessible(true)
+    
writeReplace.invoke(closure).asInstanceOf[java.lang.invoke.SerializedLambda]
+  }
+
+  /**
    * Helper method to clean the given closure in place.
    *
    * The mechanism is to traverse the hierarchy of enclosing closures and null 
out any
@@ -206,7 +245,12 @@ private[spark] object ClosureCleaner extends Logging {
       cleanTransitively: Boolean,
       accessedFields: Map[Class[_], Set[String]]): Unit = {
 
-    if (!isClosure(func.getClass)) {
+    // most likely to be the case with 2.12, 2.13
+    // so we check first
+    // non LMF-closures should be less frequent from now on
+    val lambdaFunc = getSerializedLambda(func)
+
+    if (!isClosure(func.getClass) && lambdaFunc.isEmpty) {
       logDebug(s"Expected a closure; got ${func.getClass.getName}")
       return
     }
@@ -218,118 +262,132 @@ private[spark] object ClosureCleaner extends Logging {
       return
     }
 
-    logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}) +++")
-
-    // A list of classes that represents closures enclosed in the given one
-    val innerClasses = getInnerClosureClasses(func)
-
-    // A list of enclosing objects and their respective classes, from 
innermost to outermost
-    // An outer object at a given index is of type outer class at the same 
index
-    val (outerClasses, outerObjects) = getOuterClassesAndObjects(func)
-
-    // For logging purposes only
-    val declaredFields = func.getClass.getDeclaredFields
-    val declaredMethods = func.getClass.getDeclaredMethods
-
-    if (log.isDebugEnabled) {
-      logDebug(" + declared fields: " + declaredFields.size)
-      declaredFields.foreach { f => logDebug("     " + f) }
-      logDebug(" + declared methods: " + declaredMethods.size)
-      declaredMethods.foreach { m => logDebug("     " + m) }
-      logDebug(" + inner classes: " + innerClasses.size)
-      innerClasses.foreach { c => logDebug("     " + c.getName) }
-      logDebug(" + outer classes: " + outerClasses.size)
-      outerClasses.foreach { c => logDebug("     " + c.getName) }
-      logDebug(" + outer objects: " + outerObjects.size)
-      outerObjects.foreach { o => logDebug("     " + o) }
-    }
+    if (lambdaFunc.isEmpty) {
+      logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}) +++")
+
+      // A list of classes that represents closures enclosed in the given one
+      val innerClasses = getInnerClosureClasses(func)
+
+      // A list of enclosing objects and their respective classes, from 
innermost to outermost
+      // An outer object at a given index is of type outer class at the same 
index
+      val (outerClasses, outerObjects) = getOuterClassesAndObjects(func)
+
+      // For logging purposes only
+      val declaredFields = func.getClass.getDeclaredFields
+      val declaredMethods = func.getClass.getDeclaredMethods
+
+      if (log.isDebugEnabled) {
+        logDebug(s" + declared fields: ${declaredFields.size}")
+        declaredFields.foreach { f => logDebug(s"     $f") }
+        logDebug(s" + declared methods: ${declaredMethods.size}")
+        declaredMethods.foreach { m => logDebug(s"     $m") }
+        logDebug(s" + inner classes: ${innerClasses.size}")
+        innerClasses.foreach { c => logDebug(s"     ${c.getName}") }
+        logDebug(s" + outer classes: ${outerClasses.size}" )
+        outerClasses.foreach { c => logDebug(s"     ${c.getName}") }
+        logDebug(s" + outer objects: ${outerObjects.size}")
+        outerObjects.foreach { o => logDebug(s"     $o") }
+      }
 
-    // Fail fast if we detect return statements in closures
-    getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0)
-
-    // If accessed fields is not populated yet, we assume that
-    // the closure we are trying to clean is the starting one
-    if (accessedFields.isEmpty) {
-      logDebug(s" + populating accessed fields because this is the starting 
closure")
-      // Initialize accessed fields with the outer classes first
-      // This step is needed to associate the fields to the correct classes 
later
-      initAccessedFields(accessedFields, outerClasses)
-
-      // Populate accessed fields by visiting all fields and methods accessed 
by this and
-      // all of its inner closures. If transitive cleaning is enabled, this 
may recursively
-      // visits methods that belong to other classes in search of transitively 
referenced fields.
-      for (cls <- func.getClass :: innerClasses) {
-        getClassReader(cls).accept(new FieldAccessFinder(accessedFields, 
cleanTransitively), 0)
+      // Fail fast if we detect return statements in closures
+      getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0)
+
+      // If accessed fields is not populated yet, we assume that
+      // the closure we are trying to clean is the starting one
+      if (accessedFields.isEmpty) {
+        logDebug(" + populating accessed fields because this is the starting 
closure")
+        // Initialize accessed fields with the outer classes first
+        // This step is needed to associate the fields to the correct classes 
later
+        initAccessedFields(accessedFields, outerClasses)
+
+        // Populate accessed fields by visiting all fields and methods 
accessed by this and
+        // all of its inner closures. If transitive cleaning is enabled, this 
may recursively
+        // visits methods that belong to other classes in search of 
transitively referenced fields.
+        for (cls <- func.getClass :: innerClasses) {
+          getClassReader(cls).accept(new FieldAccessFinder(accessedFields, 
cleanTransitively), 0)
+        }
       }
-    }
 
-    logDebug(s" + fields accessed by starting closure: " + accessedFields.size)
-    accessedFields.foreach { f => logDebug("     " + f) }
-
-    // List of outer (class, object) pairs, ordered from outermost to innermost
-    // Note that all outer objects but the outermost one (first one in this 
list) must be closures
-    var outerPairs: List[(Class[_], AnyRef)] = (outerClasses zip 
outerObjects).reverse
-    var parent: AnyRef = null
-    if (outerPairs.size > 0) {
-      val (outermostClass, outermostObject) = outerPairs.head
-      if (isClosure(outermostClass)) {
-        logDebug(s" + outermost object is a closure, so we clone it: 
${outerPairs.head}")
-      } else if (outermostClass.getName.startsWith("$line")) {
-        // SPARK-14558: if the outermost object is a REPL line object, we 
should clone and clean it
-        // as it may carray a lot of unnecessary information, e.g. hadoop 
conf, spark conf, etc.
-        logDebug(s" + outermost object is a REPL line object, so we clone it: 
${outerPairs.head}")
+      logDebug(s" + fields accessed by starting closure: " + 
accessedFields.size)
+      accessedFields.foreach { f => logDebug("     " + f) }
+
+      // List of outer (class, object) pairs, ordered from outermost to 
innermost
+      // Note that all outer objects but the outermost one (first one in this 
list) must be closures
+      var outerPairs: List[(Class[_], AnyRef)] = 
outerClasses.zip(outerObjects).reverse
+      var parent: AnyRef = null
+      if (outerPairs.nonEmpty) {
+        val (outermostClass, outermostObject) = outerPairs.head
+        if (isClosure(outermostClass)) {
+          logDebug(s" + outermost object is a closure, so we clone it: 
${outerPairs.head}")
+        } else if (outermostClass.getName.startsWith("$line")) {
+          // SPARK-14558: if the outermost object is a REPL line object, we 
should clone
+          // and clean it as it may carray a lot of unnecessary information,
+          // e.g. hadoop conf, spark conf, etc.
+          logDebug(s" + outermost object is a REPL line object, so we clone 
it: ${outerPairs.head}")
+        } else {
+          // The closure is ultimately nested inside a class; keep the object 
of that
+          // class without cloning it since we don't want to clone the user's 
objects.
+          // Note that we still need to keep around the outermost object 
itself because
+          // we need it to clone its child closure later (see below).
+          logDebug(" + outermost object is not a closure or REPL line object," 
+
+            "so do not clone it: " +  outerPairs.head)
+          parent = outermostObject // e.g. SparkContext
+          outerPairs = outerPairs.tail
+        }
       } else {
-        // The closure is ultimately nested inside a class; keep the object of 
that
-        // class without cloning it since we don't want to clone the user's 
objects.
-        // Note that we still need to keep around the outermost object itself 
because
-        // we need it to clone its child closure later (see below).
-        logDebug(" + outermost object is not a closure or REPL line object, so 
do not clone it: " +
-          outerPairs.head)
-        parent = outermostObject // e.g. SparkContext
-        outerPairs = outerPairs.tail
+        logDebug(" + there are no enclosing objects!")
       }
-    } else {
-      logDebug(" + there are no enclosing objects!")
-    }
 
-    // Clone the closure objects themselves, nulling out any fields that are 
not
-    // used in the closure we're working on or any of its inner closures.
-    for ((cls, obj) <- outerPairs) {
-      logDebug(s" + cloning the object $obj of class ${cls.getName}")
-      // We null out these unused references by cloning each object and then 
filling in all
-      // required fields from the original object. We need the parent here 
because the Java
-      // language specification requires the first constructor parameter of 
any closure to be
-      // its enclosing object.
-      val clone = cloneAndSetFields(parent, obj, cls, accessedFields)
-
-      // If transitive cleaning is enabled, we recursively clean any enclosing 
closure using
-      // the already populated accessed fields map of the starting closure
-      if (cleanTransitively && isClosure(clone.getClass)) {
-        logDebug(s" + cleaning cloned closure $clone recursively 
(${cls.getName})")
-        // No need to check serializable here for the outer closures because 
we're
-        // only interested in the serializability of the starting closure
-        clean(clone, checkSerializable = false, cleanTransitively, 
accessedFields)
+      // Clone the closure objects themselves, nulling out any fields that are 
not
+      // used in the closure we're working on or any of its inner closures.
+      for ((cls, obj) <- outerPairs) {
+        logDebug(s" + cloning the object $obj of class ${cls.getName}")
+        // We null out these unused references by cloning each object and then 
filling in all
+        // required fields from the original object. We need the parent here 
because the Java
+        // language specification requires the first constructor parameter of 
any closure to be
+        // its enclosing object.
+        val clone = cloneAndSetFields(parent, obj, cls, accessedFields)
+
+        // If transitive cleaning is enabled, we recursively clean any 
enclosing closure using
+        // the already populated accessed fields map of the starting closure
+        if (cleanTransitively && isClosure(clone.getClass)) {
+          logDebug(s" + cleaning cloned closure $clone recursively 
(${cls.getName})")
+          // No need to check serializable here for the outer closures because 
we're
+          // only interested in the serializability of the starting closure
+          clean(clone, checkSerializable = false, cleanTransitively, 
accessedFields)
+        }
+        parent = clone
       }
-      parent = clone
-    }
 
-    // Update the parent pointer ($outer) of this closure
-    if (parent != null) {
-      val field = func.getClass.getDeclaredField("$outer")
-      field.setAccessible(true)
-      // If the starting closure doesn't actually need our enclosing object, 
then just null it out
-      if (accessedFields.contains(func.getClass) &&
-        !accessedFields(func.getClass).contains("$outer")) {
-        logDebug(s" + the starting closure doesn't actually need $parent, so 
we null it out")
-        field.set(func, null)
-      } else {
-        // Update this closure's parent pointer to point to our enclosing 
object,
-        // which could either be a cloned closure or the original user object
-        field.set(func, parent)
+      // Update the parent pointer ($outer) of this closure
+      if (parent != null) {
+        val field = func.getClass.getDeclaredField("$outer")
+        field.setAccessible(true)
+        // If the starting closure doesn't actually need our enclosing object, 
then just null it out
+        if (accessedFields.contains(func.getClass) &&
+          !accessedFields(func.getClass).contains("$outer")) {
+          logDebug(s" + the starting closure doesn't actually need $parent, so 
we null it out")
+          field.set(func, null)
+        } else {
+          // Update this closure's parent pointer to point to our enclosing 
object,
+          // which could either be a cloned closure or the original user object
+          field.set(func, parent)
+        }
       }
-    }
 
-    logDebug(s" +++ closure $func (${func.getClass.getName}) is now cleaned 
+++")
+      logDebug(s" +++ closure $func (${func.getClass.getName}) is now cleaned 
+++")
+    } else {
+      logDebug(s"Cleaning lambda: ${lambdaFunc.get.getImplMethodName}")
+
+      // scalastyle:off classforname
+      val captClass = 
Class.forName(lambdaFunc.get.getCapturingClass.replace('/', '.'),
+        false, Thread.currentThread.getContextClassLoader)
+      // scalastyle:on classforname
+      // Fail fast if we detect return statements in closures
+      getClassReader(captClass)
+        .accept(new 
ReturnStatementFinder(Some(lambdaFunc.get.getImplMethodName)), 0)
+      logDebug(s" +++ Lambda closure (${lambdaFunc.get.getImplMethodName}) is 
now cleaned +++")
+    }
 
     if (checkSerializable) {
       ensureSerializable(func)
@@ -366,14 +424,24 @@ private[spark] object ClosureCleaner extends Logging {
 private[spark] class ReturnStatementInClosureException
   extends SparkException("Return statements aren't allowed in Spark closures")
 
-private class ReturnStatementFinder extends ClassVisitor(ASM5) {
+private class ReturnStatementFinder(targetMethodName: Option[String] = None)
+  extends ClassVisitor(ASM5) {
   override def visitMethod(access: Int, name: String, desc: String,
       sig: String, exceptions: Array[String]): MethodVisitor = {
+
     // $anonfun$ covers Java 8 lambdas
     if (name.contains("apply") || name.contains("$anonfun$")) {
+      // A method with suffix "$adapted" will be generated in cases like
+      // { _:Int => return; Seq()} but not { _:Int => return; true}
+      // closure passed is $anonfun$t$1$adapted while actual code resides in 
$anonfun$s$1
+      // visitor will see only $anonfun$s$1$adapted, so we remove the suffix, 
see
+      // https://github.com/scala/scala-dev/issues/109
+      val isTargetMethod = targetMethodName.isEmpty ||
+        name == targetMethodName.get || name == 
targetMethodName.get.stripSuffix("$adapted")
+
       new MethodVisitor(ASM5) {
         override def visitTypeInsn(op: Int, tp: String) {
-          if (op == NEW && tp.contains("scala/runtime/NonLocalReturnControl")) 
{
+          if (op == NEW && tp.contains("scala/runtime/NonLocalReturnControl") 
&& isTargetMethod) {
             throw new ReturnStatementInClosureException
           }
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 5c6dd45..d83da0d 100644
--- 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -565,7 +565,7 @@ class ExternalAppendOnlyMap[K, V, C](
       }
     }
 
-    context.addTaskCompletionListener(context => cleanup())
+    context.addTaskCompletionListener[Unit](context => cleanup())
   }
 
   private[this] class SpillableIterator(var upstream: Iterator[(K, C)])

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala 
b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
index 9a19bae..a0010f1 100644
--- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
@@ -121,6 +121,7 @@ class ClosureCleanerSuite extends SparkFunSuite {
   }
 
   test("SPARK-22328: ClosureCleaner misses referenced superclass fields: case 
1") {
+    assume(!ClosureCleanerSuite2.supportsLMFs)
     val concreteObject = new TestAbstractClass {
       val n2 = 222
       val s2 = "bbb"
@@ -141,6 +142,7 @@ class ClosureCleanerSuite extends SparkFunSuite {
   }
 
   test("SPARK-22328: ClosureCleaner misses referenced superclass fields: case 
2") {
+    assume(!ClosureCleanerSuite2.supportsLMFs)
     val concreteObject = new TestAbstractClass2 {
       val n2 = 222
       val s2 = "bbb"
@@ -154,6 +156,7 @@ class ClosureCleanerSuite extends SparkFunSuite {
   }
 
   test("SPARK-22328: multiple outer classes have the same parent class") {
+    assume(!ClosureCleanerSuite2.supportsLMFs)
     val concreteObject = new TestAbstractClass2 {
 
       val innerObject = new TestAbstractClass2 {

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala 
b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala
index 278fada..96da8ec 100644
--- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala
+++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala
@@ -145,6 +145,7 @@ class ClosureCleanerSuite2 extends SparkFunSuite with 
BeforeAndAfterAll with Pri
   }
 
   test("get inner closure classes") {
+    assume(!ClosureCleanerSuite2.supportsLMFs)
     val closure1 = () => 1
     val closure2 = () => { () => 1 }
     val closure3 = (i: Int) => {
@@ -171,6 +172,7 @@ class ClosureCleanerSuite2 extends SparkFunSuite with 
BeforeAndAfterAll with Pri
   }
 
   test("get outer classes and objects") {
+    assume(!ClosureCleanerSuite2.supportsLMFs)
     val localValue = someSerializableValue
     val closure1 = () => 1
     val closure2 = () => localValue
@@ -207,6 +209,7 @@ class ClosureCleanerSuite2 extends SparkFunSuite with 
BeforeAndAfterAll with Pri
   }
 
   test("get outer classes and objects with nesting") {
+    assume(!ClosureCleanerSuite2.supportsLMFs)
     val localValue = someSerializableValue
 
     val test1 = () => {
@@ -258,6 +261,7 @@ class ClosureCleanerSuite2 extends SparkFunSuite with 
BeforeAndAfterAll with Pri
   }
 
   test("find accessed fields") {
+    assume(!ClosureCleanerSuite2.supportsLMFs)
     val localValue = someSerializableValue
     val closure1 = () => 1
     val closure2 = () => localValue
@@ -296,6 +300,7 @@ class ClosureCleanerSuite2 extends SparkFunSuite with 
BeforeAndAfterAll with Pri
   }
 
   test("find accessed fields with nesting") {
+    assume(!ClosureCleanerSuite2.supportsLMFs)
     val localValue = someSerializableValue
 
     val test1 = () => {
@@ -538,17 +543,22 @@ class ClosureCleanerSuite2 extends SparkFunSuite with 
BeforeAndAfterAll with Pri
       // As before, this closure is neither serializable nor cleanable
       verifyCleaning(inner1, serializableBefore = false, serializableAfter = 
false)
 
-      // This closure is no longer serializable because it now has a pointer 
to the outer closure,
-      // which is itself not serializable because it has a pointer to the 
ClosureCleanerSuite2.
-      // If we do not clean transitively, we will not null out this indirect 
reference.
-      verifyCleaning(
-        inner2, serializableBefore = false, serializableAfter = false, 
transitive = false)
-
-      // If we clean transitively, we will find that method `a` does not 
actually reference the
-      // outer closure's parent (i.e. the ClosureCleanerSuite), so we can 
additionally null out
-      // the outer closure's parent pointer. This will make `inner2` 
serializable.
-      verifyCleaning(
-        inner2, serializableBefore = false, serializableAfter = true, 
transitive = true)
+      if (ClosureCleanerSuite2.supportsLMFs) {
+        verifyCleaning(
+          inner2, serializableBefore = true, serializableAfter = true)
+      } else {
+        // This closure is no longer serializable because it now has a pointer 
to the outer closure,
+        // which is itself not serializable because it has a pointer to the 
ClosureCleanerSuite2.
+        // If we do not clean transitively, we will not null out this indirect 
reference.
+        verifyCleaning(
+          inner2, serializableBefore = false, serializableAfter = false, 
transitive = false)
+
+        // If we clean transitively, we will find that method `a` does not 
actually reference the
+        // outer closure's parent (i.e. the ClosureCleanerSuite), so we can 
additionally null out
+        // the outer closure's parent pointer. This will make `inner2` 
serializable.
+        verifyCleaning(
+          inner2, serializableBefore = false, serializableAfter = true, 
transitive = true)
+      }
     }
 
     // Same as above, but with more levels of nesting
@@ -565,4 +575,25 @@ class ClosureCleanerSuite2 extends SparkFunSuite with 
BeforeAndAfterAll with Pri
     test6()()()
   }
 
+  test("verify nested non-LMF closures") {
+    assume(ClosureCleanerSuite2.supportsLMFs)
+    class A1(val f: Int => Int)
+    class A2(val f: Int => Int => Int)
+    class B extends A1(x => x*x)
+    class C extends A2(x => new B().f )
+    val closure1 = new B().f
+    val closure2 = new C().f
+    // serializable already
+    verifyCleaning(closure1, serializableBefore = true, serializableAfter = 
true)
+    // brings in deps that can't be cleaned
+    verifyCleaning(closure2, serializableBefore = false, serializableAfter = 
false)
+  }
+}
+
+object ClosureCleanerSuite2 {
+  // Scala 2.12 allows better interop with Java 8 via lambda syntax. This is 
supported
+  // by implementing FunctionN classes in Scala’s standard library as Single 
Abstract
+  // Method (SAM) types. Lambdas are implemented via the invokedynamic 
instruction and
+  // the use of the LambdaMwtaFactory (LMF) machanism.
+  val supportsLMFs = scala.util.Properties.versionString.contains("2.12")
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
index 7db452b..6776516 100755
--- 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
+++ 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
@@ -179,7 +179,7 @@ private[avro] class AvroFileFormat extends FileFormat
         // Ensure that the reader is closed even if the task fails or doesn't 
consume the entire
         // iterator of records.
         Option(TaskContext.get()).foreach { taskContext =>
-          taskContext.addTaskCompletionListener { _ =>
+          taskContext.addTaskCompletionListener[Unit] { _ =>
             reader.close()
           }
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
index 498e344..53bd9a9 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
@@ -166,7 +166,7 @@ private[kafka010] class KafkaSourceRDD(
         }
       }
       // Release consumer, either by removing it or indicating we're no longer 
using it
-      context.addTaskCompletionListener { _ =>
+      context.addTaskCompletionListener[Unit] { _ =>
         underlying.closeIfNeeded()
       }
       underlying

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
index 3efc90f..4513dca 100644
--- 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
+++ 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
@@ -237,7 +237,7 @@ private class KafkaRDDIterator[K, V](
   cacheLoadFactor: Float
 ) extends Iterator[ConsumerRecord[K, V]] {
 
-  context.addTaskCompletionListener(_ => closeIfNeeded())
+  context.addTaskCompletionListener[Unit](_ => closeIfNeeded())
 
   val consumer = {
     KafkaDataConsumer.init(cacheInitialCapacity, cacheMaxCapacity, 
cacheLoadFactor)

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala 
b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
index 4e84ff0..39dcd91 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
@@ -154,7 +154,7 @@ private[libsvm] class LibSVMFileFormat
 
     (file: PartitionedFile) => {
       val linesReader = new HadoopFileLinesReader(file, 
broadcastedHadoopConf.value.value)
-      Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => 
linesReader.close()))
+      Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => 
linesReader.close()))
 
       val points = linesReader
           .map(_.toString.trim)

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index c191123..72505f7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -372,7 +372,7 @@ class TungstenAggregationIterator(
     }
   }
 
-  TaskContext.get().addTaskCompletionListener(_ => {
+  TaskContext.get().addTaskCompletionListener[Unit](_ => {
     // At the end of the task, update the task's peak memory usage. Since we 
destroy
     // the map to create the sorter, their memory usages should not overlap, 
so it is safe
     // to just use the max of the two.

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
index 7487564..501520c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
@@ -86,7 +86,7 @@ private[sql] object ArrowConverters {
     val root = VectorSchemaRoot.create(arrowSchema, allocator)
     val arrowWriter = ArrowWriter.create(root)
 
-    context.addTaskCompletionListener { _ =>
+    context.addTaskCompletionListener[Unit] { _ =>
       root.close()
       allocator.close()
     }
@@ -137,7 +137,7 @@ private[sql] object ArrowConverters {
       private var schemaRead = StructType(Seq.empty)
       private var rowIter = if (payloadIter.hasNext) nextBatch() else 
Iterator.empty
 
-      context.addTaskCompletionListener { _ =>
+      context.addTaskCompletionListener[Unit] { _ =>
         closeReader()
         allocator.close()
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index 6012aba..196d057 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -97,7 +97,7 @@ case class InMemoryTableScanExec(
         columnarBatch.column(i).asInstanceOf[WritableColumnVector],
         columnarBatchSchema.fields(i).dataType, rowCount)
     }
-    taskContext.foreach(_.addTaskCompletionListener(_ => 
columnarBatch.close()))
+    taskContext.foreach(_.addTaskCompletionListener[Unit](_ => 
columnarBatch.close()))
     columnarBatch
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CodecStreams.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CodecStreams.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CodecStreams.scala
index c0df6c7..9fddfad 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CodecStreams.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CodecStreams.scala
@@ -50,7 +50,7 @@ object CodecStreams {
    */
   def createInputStreamWithCloseResource(config: Configuration, path: Path): 
InputStream = {
     val inputStream = createInputStream(config, path)
-    Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => 
inputStream.close()))
+    Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => 
inputStream.close()))
     inputStream
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 28c36b6..99fc78f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -214,7 +214,7 @@ class FileScanRDD(
     }
 
     // Register an on-task-completion callback to close the input stream.
-    context.addTaskCompletionListener(_ => iterator.close())
+    context.addTaskCompletionListener[Unit](_ => iterator.close())
 
     iterator.asInstanceOf[Iterator[InternalRow]] // This is an erasure hack.
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
index 82322df..b7b46c7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
@@ -214,7 +214,7 @@ object TextInputCSVDataSource extends CSVDataSource {
       caseSensitive: Boolean): Iterator[InternalRow] = {
     val lines = {
       val linesReader = new HadoopFileLinesReader(file, conf)
-      Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => 
linesReader.close()))
+      Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => 
linesReader.close()))
       linesReader.map { line =>
         new String(line.getBytes, 0, line.getLength, parser.options.charset)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
index 1b3b17c..16b4938 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
@@ -265,7 +265,7 @@ private[jdbc] class JDBCRDD(
       closed = true
     }
 
-    context.addTaskCompletionListener{ context => close() }
+    context.addTaskCompletionListener[Unit]{ context => close() }
 
     val inputMetrics = context.taskMetrics().inputMetrics
     val part = thePart.asInstanceOf[JDBCPartition]

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
index 2fee212..d6c5888 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
@@ -130,7 +130,7 @@ object TextInputJsonDataSource extends JsonDataSource {
       parser: JacksonParser,
       schema: StructType): Iterator[InternalRow] = {
     val linesReader = new HadoopFileLinesReader(file, 
parser.options.lineSeparatorInRead, conf)
-    Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => 
linesReader.close()))
+    Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => 
linesReader.close()))
     val textParser = parser.options.encoding
       .map(enc => CreateJacksonParser.text(enc, _: JsonFactory, _: Text))
       .getOrElse(CreateJacksonParser.text(_: JsonFactory, _: Text))

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index df1cebe..4574f82 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -205,7 +205,7 @@ class OrcFileFormat
           // There is a possibility that `initialize` and `initBatch` hit some 
errors (like OOM)
           // after opening a file.
           val iter = new RecordReaderIterator(batchReader)
-          Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => 
iter.close()))
+          
Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => 
iter.close()))
 
           batchReader.initialize(fileSplit, taskAttemptContext)
           batchReader.initBatch(
@@ -220,7 +220,7 @@ class OrcFileFormat
           val orcRecordReader = new OrcInputFormat[OrcStruct]
             .createRecordReader(fileSplit, taskAttemptContext)
           val iter = new RecordReaderIterator[OrcStruct](orcRecordReader)
-          Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => 
iter.close()))
+          
Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => 
iter.close()))
 
           val fullSchema = requiredSchema.toAttributes ++ 
partitionSchema.toAttributes
           val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, 
fullSchema)

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 2d4ac76..283d776 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -411,7 +411,7 @@ class ParquetFileFormat
           convertTz.orNull, enableOffHeapColumnVector && 
taskContext.isDefined, capacity)
         val iter = new RecordReaderIterator(vectorizedReader)
         // SPARK-23457 Register a task completion lister before 
`initialization`.
-        taskContext.foreach(_.addTaskCompletionListener(_ => iter.close()))
+        taskContext.foreach(_.addTaskCompletionListener[Unit](_ => 
iter.close()))
         vectorizedReader.initialize(split, hadoopAttemptContext)
         logDebug(s"Appending $partitionSchema ${file.partitionValues}")
         vectorizedReader.initBatch(partitionSchema, file.partitionValues)
@@ -432,7 +432,7 @@ class ParquetFileFormat
         }
         val iter = new RecordReaderIterator(reader)
         // SPARK-23457 Register a task completion lister before 
`initialization`.
-        taskContext.foreach(_.addTaskCompletionListener(_ => iter.close()))
+        taskContext.foreach(_.addTaskCompletionListener[Unit](_ => 
iter.close()))
         reader.initialize(split, hadoopAttemptContext)
 
         val fullSchema = requiredSchema.toAttributes ++ 
partitionSchema.toAttributes

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
index 8661a53..2682971 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
@@ -120,7 +120,7 @@ class TextFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
       } else {
         new HadoopFileWholeTextReader(file, confValue)
       }
-      Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => 
reader.close()))
+      Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => 
reader.close()))
       if (requiredSchema.isEmpty) {
         val emptyUnsafeRow = new UnsafeRow(0)
         reader.map(_ => emptyUnsafeRow)

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
index 7ea5342..7828298 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
@@ -40,7 +40,7 @@ class DataSourceRDD[T: ClassTag](
   override def compute(split: Partition, context: TaskContext): Iterator[T] = {
     val reader = split.asInstanceOf[DataSourceRDDPartition[T]].inputPartition
         .createPartitionReader()
-    context.addTaskCompletionListener(_ => reader.close())
+    context.addTaskCompletionListener[Unit](_ => reader.close())
     val iter = new Iterator[T] {
       private[this] var valuePrepared = false
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index 0396168..dab873b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -214,7 +214,7 @@ trait HashJoin {
     }
 
     // At the end of the task, we update the avg hash probe.
-    TaskContext.get().addTaskCompletionListener(_ =>
+    TaskContext.get().addTaskCompletionListener[Unit](_ =>
       avgHashProbe.set(hashed.getAverageProbesPerLookup))
 
     val resultProj = createResultProjection

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
index 897a4da..2b59ed6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
@@ -57,7 +57,7 @@ case class ShuffledHashJoinExec(
     buildTime += (System.nanoTime() - start) / 1000000
     buildDataSize += relation.estimatedSize
     // This relation is usually used until the end of task.
-    context.addTaskCompletionListener(_ => relation.close())
+    context.addTaskCompletionListener[Unit](_ => relation.close())
     relation
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
index d00f6f0..88c9c02 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
@@ -125,7 +125,7 @@ case class AggregateInPandasExec(
       // combine input with output from Python.
       val queue = HybridRowQueue(context.taskMemoryManager(),
         new File(Utils.getLocalDir(SparkEnv.get.conf)), 
groupingExpressions.length)
-      context.addTaskCompletionListener { _ =>
+      context.addTaskCompletionListener[Unit] { _ =>
         queue.close()
       }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
index ca66565..85b1871 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
@@ -131,7 +131,7 @@ class ArrowPythonRunner(
       private var schema: StructType = _
       private var vectors: Array[ColumnVector] = _
 
-      context.addTaskCompletionListener { _ =>
+      context.addTaskCompletionListener[Unit] { _ =>
         if (reader != null) {
           reader.close(false)
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
index 860dc78..04c7dfd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
@@ -97,7 +97,7 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: 
Seq[Attribute], chil
       // combine input with output from Python.
       val queue = HybridRowQueue(context.taskMemoryManager(),
         new File(Utils.getLocalDir(SparkEnv.get.conf)), child.output.length)
-      context.addTaskCompletionListener { ctx =>
+      context.addTaskCompletionListener[Unit] { ctx =>
         queue.close()
       }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala
index a587731..f08f816 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala
@@ -56,7 +56,7 @@ class PythonForeachWriter(func: PythonFunction, schema: 
StructType)
 
   override def open(partitionId: Long, version: Long): Boolean = {
     outputIterator  // initialize everything
-    TaskContext.get.addTaskCompletionListener { _ => buffer.close() }
+    TaskContext.get.addTaskCompletionListener[Unit] { _ => buffer.close() }
     true
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
index 628029b..47bfbde 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
@@ -142,7 +142,7 @@ case class WindowInPandasExec(
       // combine input with output from Python.
       val queue = HybridRowQueue(context.taskMemoryManager(),
         new File(Utils.getLocalDir(SparkEnv.get.conf)), child.output.length)
-      context.addTaskCompletionListener { _ =>
+      context.addTaskCompletionListener[Unit] { _ =>
         queue.close()
       }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala
index 10d8fc5..aec756c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala
@@ -40,7 +40,7 @@ case class ContinuousCoalesceRDDPartition(
       queueSize, numShuffleWriters, epochIntervalMs, env)
     val endpoint = env.setupEndpoint(endpointName, receiver)
 
-    TaskContext.get().addTaskCompletionListener { ctx =>
+    TaskContext.get().addTaskCompletionListener[Unit] { ctx =>
       env.stop(endpoint)
     }
     (receiver, endpoint)
@@ -118,9 +118,8 @@ class ContinuousCoalesceRDD(
         }
       }
 
-      context.addTaskCompletionListener { ctx =>
+      context.addTaskCompletionListener[Unit] { ctx =>
         threadPool.shutdownNow()
-        ()
       }
 
       part.writersInitialized = true

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala
index bfb8705..ec1dabd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala
@@ -70,7 +70,7 @@ class ContinuousQueuedDataReader(
   dataReaderThread.setDaemon(true)
   dataReaderThread.start()
 
-  context.addTaskCompletionListener(_ => {
+  context.addTaskCompletionListener[Unit](_ => {
     this.close()
   })
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala
index 518223f..9b13f63 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala
@@ -40,7 +40,7 @@ case class ContinuousShuffleReadPartition(
       queueSize, numShuffleWriters, epochIntervalMs, env)
     val endpoint = env.setupEndpoint(endpointName, receiver)
 
-    TaskContext.get().addTaskCompletionListener { ctx =>
+    TaskContext.get().addTaskCompletionListener[Unit] { ctx =>
       env.stop(endpoint)
     }
     (receiver, endpoint)

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
index 6b38630..55d783e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
@@ -290,7 +290,7 @@ class SymmetricHashJoinStateManager(
   private val keyWithIndexToValue = new KeyWithIndexToValueStore()
 
   // Clean up any state store resources if necessary at the end of the task
-  Option(TaskContext.get()).foreach { _.addTaskCompletionListener { _ => 
abortIfNeeded() } }
+  Option(TaskContext.get()).foreach { _.addTaskCompletionListener[Unit] { _ => 
abortIfNeeded() } }
 
   /** Helper trait for invoking common functionalities of a state store. */
   private abstract class StateStoreHandler(stateStoreType: StateStoreType) 
extends Logging {

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
index 0b32327..b602143 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
@@ -61,7 +61,7 @@ package object state {
       val cleanedF = dataRDD.sparkContext.clean(storeUpdateFunction)
       val wrappedF = (store: StateStore, iter: Iterator[T]) => {
         // Abort the state store in case of error
-        TaskContext.get().addTaskCompletionListener(_ => {
+        TaskContext.get().addTaskCompletionListener[Unit](_ => {
           if (!store.hasCommitted) store.abort()
         })
         cleanedF(store, iter)

http://git-wip-us.apache.org/repos/asf/spark/blob/a6573699/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
index 2009069..de8085f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
@@ -164,7 +164,8 @@ class OrcFileFormat extends FileFormat with 
DataSourceRegister with Serializable
         }
 
         val recordsIterator = new 
RecordReaderIterator[OrcStruct](orcRecordReader)
-        Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => 
recordsIterator.close()))
+        Option(TaskContext.get())
+          .foreach(_.addTaskCompletionListener[Unit](_ => 
recordsIterator.close()))
 
         // Unwraps `OrcStruct`s to `UnsafeRow`s
         OrcFileFormat.unwrapOrcStructs(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to