Repository: spark Updated Branches: refs/heads/master f3201aeeb -> 2c5af7d4d
[SPARK-13640][SQL] Synchronize ScalaReflection.mirror method. ## What changes were proposed in this pull request? `ScalaReflection.mirror` method should be synchronized when scala version is `2.10` because `universe.runtimeMirror` is not thread safe. ## How was this patch tested? I added a test to check thread safety of `ScalaRefection.mirror` method in `ScalaReflectionSuite`, which will throw the following Exception in Scala `2.10` without this patch: ``` [info] - thread safety of mirror *** FAILED *** (49 milliseconds) [info] java.lang.UnsupportedOperationException: tail of empty list [info] at scala.collection.immutable.Nil$.tail(List.scala:339) [info] at scala.collection.immutable.Nil$.tail(List.scala:334) [info] at scala.reflect.internal.SymbolTable.popPhase(SymbolTable.scala:172) [info] at scala.reflect.internal.Symbols$Symbol.unsafeTypeParams(Symbols.scala:1477) [info] at scala.reflect.internal.Symbols$TypeSymbol.tpe(Symbols.scala:2777) [info] at scala.reflect.internal.Mirrors$RootsBase.init(Mirrors.scala:235) [info] at scala.reflect.runtime.JavaMirrors$class.createMirror(JavaMirrors.scala:34) [info] at scala.reflect.runtime.JavaMirrors$class.runtimeMirror(JavaMirrors.scala:61) [info] at scala.reflect.runtime.JavaUniverse.runtimeMirror(JavaUniverse.scala:12) [info] at scala.reflect.runtime.JavaUniverse.runtimeMirror(JavaUniverse.scala:12) [info] at org.apache.spark.sql.catalyst.ScalaReflection$.mirror(ScalaReflection.scala:36) [info] at org.apache.spark.sql.catalyst.ScalaReflectionSuite$$anonfun$12$$anonfun$apply$mcV$sp$1$$anonfun$apply$1$$anonfun$apply$2.apply(ScalaReflectionSuite.scala:256) [info] at org.apache.spark.sql.catalyst.ScalaReflectionSuite$$anonfun$12$$anonfun$apply$mcV$sp$1$$anonfun$apply$1$$anonfun$apply$2.apply(ScalaReflectionSuite.scala:252) [info] at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) [info] at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) [info] at scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107) [info] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [info] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [info] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [info] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ``` Notice that the test will pass when Scala version is `2.11`. Author: Takuya UESHIN <ues...@happy-camper.st> Closes #11487 from ueshin/issues/SPARK-13640. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c5af7d4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c5af7d4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c5af7d4 Branch: refs/heads/master Commit: 2c5af7d4d939e18a749d33b5de2e5113aa3eff08 Parents: f3201ae Author: Takuya UESHIN <ues...@happy-camper.st> Authored: Wed Mar 9 10:23:27 2016 +0000 Committer: Sean Owen <so...@cloudera.com> Committed: Wed Mar 9 10:23:27 2016 +0000 ---------------------------------------------------------------------- .../spark/sql/catalyst/ScalaReflection.scala | 7 +++- .../sql/catalyst/ScalaReflectionSuite.scala | 41 ++++++++++++++++++++ 2 files changed, 46 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2c5af7d4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index 02cb2d9..c12b5c2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -32,8 +32,10 @@ object ScalaReflection extends ScalaReflection { // Since we are creating a runtime mirror usign the class loader of current thread, // we need to use def at here. So, every time we call mirror, it is using the // class loader of the current thread. - override def mirror: universe.Mirror = + // SPARK-13640: Synchronize this because universe.runtimeMirror is not thread-safe in Scala 2.10. + override def mirror: universe.Mirror = ScalaReflectionLock.synchronized { universe.runtimeMirror(Thread.currentThread().getContextClassLoader) + } import universe._ @@ -665,7 +667,8 @@ trait ScalaReflection { * * @see SPARK-5281 */ - def localTypeOf[T: TypeTag]: `Type` = { + // SPARK-13640: Synchronize this because TypeTag.tpe is not thread-safe in Scala 2.10. + def localTypeOf[T: TypeTag]: `Type` = ScalaReflectionLock.synchronized { val tag = implicitly[TypeTag[T]] tag.in(mirror).tpe.normalize } http://git-wip-us.apache.org/repos/asf/spark/blob/2c5af7d4/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala index 5fe09b1..dd31050 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala @@ -17,10 +17,15 @@ package org.apache.spark.sql.catalyst +import java.net.URLClassLoader import java.sql.{Date, Timestamp} +import scala.reflect.runtime.universe.typeOf + import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.expressions.BoundReference import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils case class PrimitiveData( intField: Int, @@ -236,4 +241,40 @@ class ScalaReflectionSuite extends SparkFunSuite { assert(anyTypes.forall(!_.isPrimitive)) assert(anyTypes === Seq(classOf[java.lang.Object], classOf[java.lang.Object])) } + + private val dataTypeForComplexData = dataTypeFor[ComplexData] + private val typeOfComplexData = typeOf[ComplexData] + + Seq( + ("mirror", () => mirror), + ("dataTypeFor", () => dataTypeFor[ComplexData]), + ("constructorFor", () => constructorFor[ComplexData]), + ("extractorsFor", { + val inputObject = BoundReference(0, dataTypeForComplexData, nullable = false) + () => extractorsFor[ComplexData](inputObject) + }), + ("getConstructorParameters(cls)", () => getConstructorParameters(classOf[ComplexData])), + ("getConstructorParameterNames", () => getConstructorParameterNames(classOf[ComplexData])), + ("getClassFromType", () => getClassFromType(typeOfComplexData)), + ("schemaFor", () => schemaFor[ComplexData]), + ("localTypeOf", () => localTypeOf[ComplexData]), + ("getClassNameFromType", () => getClassNameFromType(typeOfComplexData)), + ("getParameterTypes", () => getParameterTypes(() => ())), + ("getConstructorParameters(tpe)", () => getClassNameFromType(typeOfComplexData))).foreach { + case (name, exec) => + test(s"SPARK-13640: thread safety of ${name}") { + (0 until 100).foreach { _ => + val loader = new URLClassLoader(Array.empty, Utils.getContextOrSparkClassLoader) + (0 until 10).par.foreach { _ => + val cl = Thread.currentThread.getContextClassLoader + try { + Thread.currentThread.setContextClassLoader(loader) + exec() + } finally { + Thread.currentThread.setContextClassLoader(cl) + } + } + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org