cloud-fan commented on code in PR #56430:
URL: https://github.com/apache/spark/pull/56430#discussion_r3399599648
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala:
##########
@@ -595,7 +602,12 @@ case class NewInstance(
propagateNull: Boolean,
dataType: DataType,
outerPointer: Option[() => AnyRef]) extends InvokeLike {
+ // JVM-form binary name (with `$` for inner classes); used where literal `$`
+ // is intentional (e.g., Scala companion access `Foo$.MODULE$`).
private val className = cls.getName
+ // Dotted Java-source FQN; required by the JDK compiler in `new X(...)`
+ // expressions and casts. Equal to `className` for top-level classes.
Review Comment:
`javaSourceName` returns `cls.getName`, so this value is never dotted and
always equals `className` — the rewrite to a javac-legal form happens in
`JdkCodeCompiler` at compile time, not here.
```suggestion
// The same binary name as `className`, routed through `javaSourceName` to
mark
// it as a type reference the JDK backend rewrites to a javac-legal form at
// compile time (see JdkCodeCompiler.rewriteInnerClassRefs).
```
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala:
##########
@@ -194,9 +194,13 @@ case class SortPrefix(child: SortOrder) extends
UnaryExpression {
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val childCode = child.child.genCode(ctx)
val input = childCode.value
- val BinaryPrefixCmp = classOf[BinaryPrefixComparator].getName
- val DoublePrefixCmp = classOf[DoublePrefixComparator].getName
- val StringPrefixCmp = classOf[StringPrefixComparator].getName
+ // Use javaSourceName to emit the canonical binary form (e.g.
Review Comment:
"canonical" and "binary" are opposite name forms in reflection, and this
value is the binary name (`getName`).
```suggestion
// Use javaSourceName to emit the binary name form (e.g.
```
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeCompiler.scala:
##########
@@ -0,0 +1,1281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream,
IOException, StringWriter}
+import java.net.{JarURLConnection, URI, URL}
+import java.util.Locale
+import java.util.concurrent.{Callable, ExecutionException, ExecutorService}
+import javax.tools.{Diagnostic, DiagnosticCollector, FileObject,
ForwardingJavaFileManager, JavaCompiler, JavaFileManager, JavaFileObject,
SimpleJavaFileObject, StandardJavaFileManager, StandardLocation, ToolProvider}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+import scala.util.control.NonFatal
+
+import com.google.common.cache.{Cache, CacheBuilder}
+import com.google.common.util.concurrent.Uninterruptibles
+import org.codehaus.commons.compiler.{CompileException,
InternalCompilerException}
+import org.codehaus.janino.ClassBodyEvaluator
+import org.codehaus.janino.util.ClassFile
+import org.codehaus.janino.util.ClassFile.CodeAttribute
+
+import org.apache.spark.{JobArtifactSet, SparkEnv, TaskContext,
TaskKilledException}
+import org.apache.spark.executor.InputMetrics
+import org.apache.spark.internal.{Logging, LogKeys}
+import org.apache.spark.metrics.source.CodegenMetrics
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Expression, UnsafeArrayData,
UnsafeMapData, UnsafeRow}
+import org.apache.spark.sql.catalyst.util.{ArrayData,
CollationAwareUTF8String, CollationFactory, CollationSupport, MapData}
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.Decimal
+import org.apache.spark.unsafe.Platform
+import org.apache.spark.unsafe.types.{BinaryView, CalendarInterval,
TimestampNanosVal, UTF8String, VariantVal}
+import org.apache.spark.util.{ParentClassLoader, ThreadUtils, Utils}
+
+/**
+ * Backend used to compile generator-produced Java source into a
[[GeneratedClass]].
+ *
+ * Two implementations are provided:
+ * - [[JaninoCodeCompiler]]: the default, uses Janino's
`ClassBodyEvaluator`. Very fast.
+ * - [[JdkCodeCompiler]]: uses `javax.tools.JavaCompiler` from the JDK.
Slower (~5x for
+ * large generated units, 30-300x for small ones), but maintained on the
JDK
+ * release cadence and not subject to Janino's unmaintained-upstream risk.
+ *
+ * The backend is selected at compile time via [[SQLConf.CODEGEN_COMPILER]].
+ */
+trait CodeCompiler {
+ /** Backend name as used in `spark.sql.codegen.compiler`. */
+ def name: String
+
+ /**
+ * Compile a generator-produced class body into an instance of the
+ * [[GeneratedClass]] subclass it defines.
+ *
+ * @return the instantiated generated class along with bytecode statistics.
+ */
+ def compile(code: CodeAndComment): (GeneratedClass, ByteCodeStats)
+}
+
+object CodeCompiler extends Logging {
+
+ // Emit log messages under CodeGenerator's logger name: operators and tests
+ // (SPARK-25113 / SPARK-51527) subscribe to that exact logger for codegen
+ // compilation events, and the backends are implementation details of
+ // `CodeGenerator.compile`, so their logs belong under its name.
+ override protected def logName: String = classOf[CodeGenerator[_, _]].getName
+
+ val JANINO: String = "janino"
+ val JDK: String = "jdk"
+
+ /**
+ * Fully-qualified imports made available to generated code by both backends.
+ *
+ * For Janino these are passed to `ClassBodyEvaluator.setDefaultImports`.
+ * For the JDK backend they are rendered into `import` statements inside the
+ * synthesized compilation unit.
+ *
+ * This is the single shared list - anything added here automatically
applies to
+ * both backends. It intentionally excludes
+ * `org.apache.spark.sql.catalyst.expressions.codegen.GeneratedClass` to
avoid
+ * a name collision with the generated subclass `GeneratedClass`; the extends
+ * clause uses the fully-qualified name instead.
+ *
+ * When adding an entry, keep its SIMPLE name distinct from any `import`
line a
+ * generator emits at the top of a class body (currently only
GenerateColumnAccessor
+ * does this): javac rejects two single-type imports sharing a simple name
+ * (JLS 7.5.1) while Janino resolves them leniently, so a collision would
fail only
+ * under the JDK backend.
+ */
+ val DefaultImports: Seq[String] = Seq(
+ classOf[Platform].getName,
+ classOf[InternalRow].getName,
+ classOf[UnsafeRow].getName,
+ classOf[BinaryView].getName,
+ classOf[UTF8String].getName,
+ classOf[Decimal].getName,
+ classOf[CalendarInterval].getName,
+ classOf[TimestampNanosVal].getName,
+ classOf[VariantVal].getName,
+ classOf[ArrayData].getName,
+ classOf[UnsafeArrayData].getName,
+ classOf[MapData].getName,
+ classOf[UnsafeMapData].getName,
+ classOf[Expression].getName,
+ classOf[TaskContext].getName,
+ classOf[TaskKilledException].getName,
+ classOf[InputMetrics].getName,
+ classOf[CollationAwareUTF8String].getName,
+ classOf[CollationFactory].getName,
+ classOf[CollationSupport].getName,
+ QueryExecutionErrors.getClass.getName.stripSuffix("$")
+ )
+
+ /**
+ * FQN of the generated class. Must NOT be under the `codegen` package or
Janino
+ * fails with `java.lang.InstantiationException`. The same name is used for
both
+ * backends so generated source, logs, and diagnostics name the same class
+ * whichever backend compiles it. (Compiled results are NOT shared across
+ * backends: the compile cache key includes the backend.)
+ */
+ val GeneratedClassName: String =
+ "org.apache.spark.sql.catalyst.expressions.GeneratedClass"
+
+ def active(): CodeCompiler = active(null)
+
+ /**
+ * Resolve the active backend for the given generated unit.
+ *
+ * The configured backend ([[SQLConf.CODEGEN_COMPILER]]) governs ordinary
codegen. The
+ * exception is codegen the JDK compiler is fundamentally *incapable* of
compiling - not
+ * merely slower at - which is always routed to Janino regardless of the
configured
+ * backend. This is deterministic routing decided up front from the
execution context and
+ * the generated source; it is never a fallback after a failed compile. Two
such cases,
+ * both classes the JDK compiler cannot name that Janino's lenient
loader/lexer accepts:
+ *
+ * - REPL / interactive sessions (spark-shell `$line*` wrappers, Spark
Connect /
+ * Ammonite session artifacts): reachable only through a runtime class
loader and
+ * carrying self-inconsistent reflection metadata the JDK compiler
cannot resolve.
+ * This arm is context-wide by design: ALL codegen in such a session
routes to
+ * Janino, whether or not the unit references a REPL class, because the
reference
+ * cannot be told from the source text up front. See [[isReplContext]].
+ * - A reference to a class nested in a Scala `package object` (binary name
+ * `a.b.package$Inner`): `package` is a Java reserved word that cannot
be spelled as
+ * an identifier in any form - Java has no backtick/escape, unlike Scala
- so javac
+ * can neither parse `a.b.package.Inner` nor resolve the flat
`a.b.package$Inner`.
+ * See [[requiresJaninoSource]].
+ */
+ def active(code: CodeAndComment): CodeCompiler = {
+ val requested = SQLConf.get.codegenCompiler
+ if (requested != JANINO && isReplContext) {
+ logReplRoutingOnce()
+ JaninoCodeCompiler
+ } else if (requested != JANINO && requiresJaninoSource(code)) {
+ logPackageObjectRoutingOnce()
+ JaninoCodeCompiler
+ } else {
+ forBackend(requested)
+ }
+ }
+
+ // One-time visibility for the deterministic routing above: an operator who
set
+ // `jdk` should be able to tell from the logs why Janino still shows up.
+ private val replRoutingLogged = new
java.util.concurrent.atomic.AtomicBoolean(false)
+ private def logReplRoutingOnce(): Unit = {
+ if (replRoutingLogged.compareAndSet(false, true)) {
+ logInfo(log"REPL / interactive session context detected; codegen is
routed to " +
+ log"Janino although ${MDC(LogKeys.CONFIG,
SQLConf.CODEGEN_COMPILER.key)} " +
+ log"requests another backend (the JDK compiler cannot resolve
REPL-defined " +
+ log"classes). This notice is logged once per JVM.")
+ }
+ }
+ private val packageObjectRoutingLogged = new
java.util.concurrent.atomic.AtomicBoolean(false)
+ private def logPackageObjectRoutingOnce(): Unit = {
+ if (packageObjectRoutingLogged.compareAndSet(false, true)) {
+ logInfo(log"Generated code references a Scala package-object class; that
unit is " +
+ log"routed to Janino although ${MDC(LogKeys.CONFIG,
SQLConf.CODEGEN_COMPILER.key)} " +
+ log"requests another backend (`package` is a Java reserved word the
JDK compiler " +
+ log"cannot name). This notice is logged once per JVM.")
+ }
+ }
+
+ // A `package` segment in a qualified/binary class name - a Scala `package
object`'s nested
+ // class such as `a.b.package$Inner`. `package` is a Java reserved word the
JDK compiler can
+ // name in no form (parse error as `package.Inner`; unresolvable as the flat
`package$Inner`),
+ // whereas Janino's lexer scans `package$Inner` as one identifier.
+ //
+ // `package` is the only keyword scanned for, by design. It is the only Java
keyword the
+ // Scala compiler ever produces in a generated name (from `package object`);
a class named
+ // after any other keyword (`class int`) requires pathological user code. It
is also the only
+ // keyword that is *safe* to scan for: the rest (`int`, `new`, `this`,
`return`, `switch`, ...)
+ // occur as legitimate tokens throughout the generated Java, so matching
them would route
+ // almost all codegen to Janino, whereas a `package` token never appears in
a generated class
+ // body except as such a class reference. (A fully general check would
inspect the resolved
+ // class names rather than the source text, but that information is only
available during
+ // compilation, i.e. after the backend is already chosen.) The lookbehind
keeps a legal
+ // identifier like `mypackage$Inner` from matching; a false positive (e.g.
text inside a string
+ // literal) is harmless - it only picks Janino, a superset of what javac
accepts.
+ private val UnnameablePackageObjectClass = """(?<![\w$])package[.$]""".r
+ private def requiresJaninoSource(code: CodeAndComment): Boolean = {
+ // This runs on every compile() call (the result is part of the cache
key), so gate
+ // the regex scan behind an intrinsified substring search: generated
bodies almost
+ // never contain the literal `package` at all, and the regex runs only
when they do.
+ code != null && code.body.contains("package") &&
+ UnnameablePackageObjectClass.findFirstIn(code.body).isDefined
+ }
+
+ private val ExecutorClassLoaderName =
"org.apache.spark.executor.ExecutorClassLoader"
+
+ /**
+ * True when codegen is running in a REPL / interactive context, detected
via the three
+ * mechanisms Spark uses to ship such classes:
+ * - the active job/session carries a REPL or artifact class-dir URI
+ * ([[JobArtifactSet.getCurrentJobArtifactState]]'s `replClassDirUri`).
This is the
+ * canonical signal: Spark Connect sets it per session and spark-shell
falls back to
+ * it from `spark.repl.class.uri`. It is a thread-local set around both
driver-side
+ * and executor-side work, so it catches driver-side codegen where no
+ * `ExecutorClassLoader` is in the loader chain (e.g. a Connect UDF over
a local
+ * relation referencing an Ammonite `$sess` class); or
+ * - `spark.repl.class.uri` set in the active conf (spark-shell sets this
globally); or
+ * - an [[org.apache.spark.executor.ExecutorClassLoader]] somewhere in the
active
+ * class loader chain (created on executors when a session has such a
class URI).
+ *
+ * The default (non-REPL) job state has no `replClassDirUri`, so ordinary
codegen is not
+ * affected. The class loader is compared by class name rather than
`isInstanceOf` so
+ * catalyst need not depend on the `repl` module. Any reflection / lookup
failure
+ * conservatively reports `false`, which preserves the configured backend.
+ */
+ private def isReplContext: Boolean = {
+ def hasArtifactReplUri =
+ try
JobArtifactSet.getCurrentJobArtifactState.exists(_.replClassDirUri.isDefined)
+ catch { case NonFatal(_) => false }
+ def confHasReplUri =
+ try Option(SparkEnv.get).exists(_.conf.contains("spark.repl.class.uri"))
+ catch { case NonFatal(_) => false }
+ def eclInChain =
+ try {
+ var loader = Utils.getContextOrSparkClassLoader
+ var found = false
+ while (loader != null && !found) {
+ if (loader.getClass.getName == ExecutorClassLoaderName) found = true
+ loader = loader.getParent
+ }
+ found
+ } catch {
+ case NonFatal(_) => false
+ }
+ hasArtifactReplUri || confHasReplUri || eclInChain
+ }
+
+ /**
+ * Get the backend by name. SQLConf already validates the value via
`checkValues`
+ * at config-set time, so unknown names should not reach here in normal use;
+ * tests may call this directly. When `jdk` is requested but the JDK
compiler is
+ * not present at runtime (a JRE-only image), this logs a warning once and
falls
+ * back to Janino so the query does not fail.
+ */
+ private[codegen] def forBackend(requested: String): CodeCompiler = {
+ requested.toLowerCase(Locale.ROOT) match {
+ case JANINO => JaninoCodeCompiler
+ case JDK if JdkCodeCompiler.isAvailable => JdkCodeCompiler
+ case JDK =>
+ logJdkUnavailableOnce()
+ JaninoCodeCompiler
+ case other =>
+ throw new IllegalArgumentException(
+ s"Unknown ${SQLConf.CODEGEN_COMPILER.key} backend: $other " +
+ s"(supported: ${Seq(JANINO, JDK).mkString(", ")})")
+ }
+ }
+
+ private val jdkUnavailableWarned = new
java.util.concurrent.atomic.AtomicBoolean(false)
+ private def logJdkUnavailableOnce(): Unit = {
+ if (jdkUnavailableWarned.compareAndSet(false, true)) {
+ logWarning(log"${MDC(LogKeys.CONFIG, SQLConf.CODEGEN_COMPILER.key)}=jdk
requested " +
+ log"but javax.tools.JavaCompiler is not available on this runtime " +
+ log"(JRE-only image?). Falling back to Janino for this JVM.")
+ }
+ }
+
+ /**
+ * Compute bytecode statistics for a set of compiled classes. Both backends
+ * produce the same map shape (className -> classfile bytes), so the
analysis is
+ * shared. This is the only piece of code that depends on Janino's
+ * `commons-compiler` ClassFile parser; it can be swapped for ASM later
without
+ * touching either backend.
+ */
+ private[codegen] def computeByteCodeStats(
+ classBytecodes: Iterable[(String, Array[Byte])]): ByteCodeStats = {
+ val perClass = classBytecodes.map { case (_, classBytes) =>
+ val classCodeSize = classBytes.length
+ CodegenMetrics.METRIC_GENERATED_CLASS_BYTECODE_SIZE.update(classCodeSize)
+ try {
+ val cf = new ClassFile(new ByteArrayInputStream(classBytes))
+ val constPoolSize = cf.getConstantPoolSize
+ val methodCodeSizes = cf.methodInfos.asScala.flatMap { method =>
+ method.getAttributes.collect { case attr: CodeAttribute =>
+ val byteCodeSize = attr.code.length
+
CodegenMetrics.METRIC_GENERATED_METHOD_BYTECODE_SIZE.update(byteCodeSize)
+ if (byteCodeSize > CodeGenerator.DEFAULT_JVM_HUGE_METHOD_LIMIT) {
+ logInfo(log"Generated method too long to be JIT compiled: " +
+ log"${MDC(LogKeys.CLASS_NAME, cf.getThisClassName)}." +
+ log"${MDC(LogKeys.METHOD_NAME, method.getName)} is " +
+ log"${MDC(LogKeys.BYTECODE_SIZE, byteCodeSize)} bytes")
+ }
+ byteCodeSize
+ }
+ }
+ // Use `maxOption` to handle classes with no methods (e.g., a synthetic
+ // module-info-style class). The original Janino-only code would have
raised
+ // UnsupportedOperationException there; we degrade gracefully to -1
instead.
+ (methodCodeSizes.maxOption.getOrElse(-1), constPoolSize)
+ } catch {
+ case NonFatal(e) =>
+ logWarning("Error calculating stats of compiled class.", e)
+ (-1, -1)
+ }
+ }
+
+ val (maxMethodSizes, constPoolSize) = perClass.unzip
+ ByteCodeStats(
+ maxMethodCodeSize = maxMethodSizes.maxOption.getOrElse(-1),
+ maxConstPoolSize = constPoolSize.maxOption.getOrElse(-1),
+ // Minus 2 for `GeneratedClass` and an outer-most generated class.
+ // Both backends wrap the class body in a single outer declaration, so
the
+ // emitted class count has the same shape (1 outer wrapper + K
user-declared
+ // classes) and the offset yields the same value under either backend.
+ // `max(0, ...)` keeps an unexpected emit shape from going negative.
+ numInnerClasses = math.max(0, classBytecodes.size - 2))
+ }
+
+ /**
+ * Log the generated source on a compilation failure. Behaviour matches the
+ * original [[CodeGenerator]] implementation. `maxLines` (the session's
+ * `loggingMaxLinesForCodegen`) is captured by the CALLER: the JDK backend
invokes
+ * this from its compile worker thread, where `SQLConf.get` would silently
return
+ * the default conf instead of the calling session's.
+ */
+ private[codegen] def logGeneratedCodeOnFailure(code: CodeAndComment,
maxLines: Int): Unit = {
+ val formatted = s"\n${CodeFormatter.format(code, maxLines)}"
+ if (Utils.isTesting) {
+ logError(formatted)
+ } else {
+ logInfo(formatted)
+ }
+ }
+}
+
+/**
+ * The default backend using Janino's `ClassBodyEvaluator`.
+ *
+ * This lifts the previous body of `CodeGenerator.doCompile`, with the only
+ * changes being: imports moved to `CodeCompiler.DefaultImports`, stats
+ * computation moved to `CodeCompiler.computeByteCodeStats` (which degrades to
+ * `-1` for a class with no methods instead of throwing; see its comment).
+ * Behaviour is otherwise preserved, including the [[ParentClassLoader]]
wrapping
+ * (workaround for the Janino `findIClass` behaviour described in SPARK-15622 /
+ * SPARK-11636).
+ */
+object JaninoCodeCompiler extends CodeCompiler with Logging {
+
+ override val name: String = CodeCompiler.JANINO
+
+ // Route source-code/debug log emissions under CodeGenerator's logger name.
+ override protected def logName: String = classOf[CodeGenerator[_, _]].getName
+
+ override def compile(code: CodeAndComment): (GeneratedClass, ByteCodeStats)
= {
+ val evaluator = new ClassBodyEvaluator()
+
+ // See SPARK-15622 / SPARK-11636 for why this wrapping is required.
+ val parentClassLoader = new
ParentClassLoader(Utils.getContextOrSparkClassLoader)
+ evaluator.setParentClassLoader(parentClassLoader)
+ evaluator.setClassName(CodeCompiler.GeneratedClassName)
+ evaluator.setDefaultImports(CodeCompiler.DefaultImports: _*)
+ evaluator.setExtendedClass(classOf[GeneratedClass])
+
+ logBasedOnLevel(SQLConf.get.codegenLogLevel) {
+ // Only add extra debugging info to byte code when we are going to print
the source code.
+ evaluator.setDebuggingInformation(true, true, false)
+ log"\n${MDC(LogKeys.CODE, CodeFormatter.format(code))}"
+ }
+
+ val codeStats =
+ try {
+ evaluator.cook("generated.java", code.body)
+ CodeCompiler.computeByteCodeStats(evaluator.getBytecodes.asScala)
+ } catch {
+ case e: InternalCompilerException =>
+ logError("Failed to compile the generated Java code.", e)
+ CodeCompiler.logGeneratedCodeOnFailure(code,
SQLConf.get.loggingMaxLinesForCodegen)
+ throw QueryExecutionErrors.internalCompilerError(e)
+ case e: CompileException =>
+ logError("Failed to compile the generated Java code.", e)
+ CodeCompiler.logGeneratedCodeOnFailure(code,
SQLConf.get.loggingMaxLinesForCodegen)
+ throw QueryExecutionErrors.compilerError(e)
+ }
+
+
(evaluator.getClazz().getConstructor().newInstance().asInstanceOf[GeneratedClass],
codeStats)
+ }
+}
+
+/**
+ * Alternative backend using the JDK's `javax.tools.JavaCompiler`.
+ *
+ * Wraps the generator-produced class body in a synthesized compilation unit
+ * (package + imports + `public class GeneratedClass extends ...`) before
+ * handing it to the compiler. Compiled classes are captured in memory and
+ * loaded through a private [[ClassLoader]] that mirrors the Janino backend's
+ * [[ParentClassLoader]] wrapping (SPARK-15622 / SPARK-11636) so behaviour on
+ * containerised deployments stays consistent.
+ *
+ * Class resolution: referenced classes are resolved through the task's context
+ * [[ClassLoader]] (see [[ClassLoaderFileManager]]) rather than a file-based
+ * `-classpath`, mirroring how Janino resolves them. This lets the JDK
compiler see
+ * classes that exist only on a runtime loader - REPL-generated, Spark Connect
+ * session artifacts - and avoids handing javac a giant classpath to index.
+ *
+ * Threading: the actual javac invocation runs on a dedicated single-threaded
+ * executor (see `compileExecutor`). This is required for correctness on Spark
+ * task threads (jar reads through interruptible NIO channels vs. task
+ * interruption), and it also confines the shared, not-thread-safe
+ * [[StandardJavaFileManager]] (used for platform classes and output) to one
Review Comment:
Compiled output never reaches the shared file manager —
`getJavaFileForOutput` captures it in `InMemoryClassFile` objects without
delegating. The same claim appears at line 1032-1033 ("and compiled output
still go through the wrapped `StandardJavaFileManager`").
```suggestion
* [[StandardJavaFileManager]] (used for platform-class lookups) to one
```
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeCompiler.scala:
##########
@@ -0,0 +1,1281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream,
IOException, StringWriter}
+import java.net.{JarURLConnection, URI, URL}
+import java.util.Locale
+import java.util.concurrent.{Callable, ExecutionException, ExecutorService}
+import javax.tools.{Diagnostic, DiagnosticCollector, FileObject,
ForwardingJavaFileManager, JavaCompiler, JavaFileManager, JavaFileObject,
SimpleJavaFileObject, StandardJavaFileManager, StandardLocation, ToolProvider}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+import scala.util.control.NonFatal
+
+import com.google.common.cache.{Cache, CacheBuilder}
+import com.google.common.util.concurrent.Uninterruptibles
+import org.codehaus.commons.compiler.{CompileException,
InternalCompilerException}
+import org.codehaus.janino.ClassBodyEvaluator
+import org.codehaus.janino.util.ClassFile
+import org.codehaus.janino.util.ClassFile.CodeAttribute
+
+import org.apache.spark.{JobArtifactSet, SparkEnv, TaskContext,
TaskKilledException}
+import org.apache.spark.executor.InputMetrics
+import org.apache.spark.internal.{Logging, LogKeys}
+import org.apache.spark.metrics.source.CodegenMetrics
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Expression, UnsafeArrayData,
UnsafeMapData, UnsafeRow}
+import org.apache.spark.sql.catalyst.util.{ArrayData,
CollationAwareUTF8String, CollationFactory, CollationSupport, MapData}
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.Decimal
+import org.apache.spark.unsafe.Platform
+import org.apache.spark.unsafe.types.{BinaryView, CalendarInterval,
TimestampNanosVal, UTF8String, VariantVal}
+import org.apache.spark.util.{ParentClassLoader, ThreadUtils, Utils}
+
+/**
+ * Backend used to compile generator-produced Java source into a
[[GeneratedClass]].
+ *
+ * Two implementations are provided:
+ * - [[JaninoCodeCompiler]]: the default, uses Janino's
`ClassBodyEvaluator`. Very fast.
+ * - [[JdkCodeCompiler]]: uses `javax.tools.JavaCompiler` from the JDK.
Slower (~5x for
+ * large generated units, 30-300x for small ones), but maintained on the
JDK
+ * release cadence and not subject to Janino's unmaintained-upstream risk.
+ *
+ * The backend is selected at compile time via [[SQLConf.CODEGEN_COMPILER]].
+ */
+trait CodeCompiler {
+ /** Backend name as used in `spark.sql.codegen.compiler`. */
+ def name: String
+
+ /**
+ * Compile a generator-produced class body into an instance of the
+ * [[GeneratedClass]] subclass it defines.
+ *
+ * @return the instantiated generated class along with bytecode statistics.
+ */
+ def compile(code: CodeAndComment): (GeneratedClass, ByteCodeStats)
+}
+
+object CodeCompiler extends Logging {
+
+ // Emit log messages under CodeGenerator's logger name: operators and tests
+ // (SPARK-25113 / SPARK-51527) subscribe to that exact logger for codegen
+ // compilation events, and the backends are implementation details of
+ // `CodeGenerator.compile`, so their logs belong under its name.
+ override protected def logName: String = classOf[CodeGenerator[_, _]].getName
+
+ val JANINO: String = "janino"
+ val JDK: String = "jdk"
+
+ /**
+ * Fully-qualified imports made available to generated code by both backends.
+ *
+ * For Janino these are passed to `ClassBodyEvaluator.setDefaultImports`.
+ * For the JDK backend they are rendered into `import` statements inside the
+ * synthesized compilation unit.
+ *
+ * This is the single shared list - anything added here automatically
applies to
+ * both backends. It intentionally excludes
+ * `org.apache.spark.sql.catalyst.expressions.codegen.GeneratedClass` to
avoid
+ * a name collision with the generated subclass `GeneratedClass`; the extends
+ * clause uses the fully-qualified name instead.
+ *
+ * When adding an entry, keep its SIMPLE name distinct from any `import`
line a
+ * generator emits at the top of a class body (currently only
GenerateColumnAccessor
+ * does this): javac rejects two single-type imports sharing a simple name
+ * (JLS 7.5.1) while Janino resolves them leniently, so a collision would
fail only
+ * under the JDK backend.
+ */
+ val DefaultImports: Seq[String] = Seq(
+ classOf[Platform].getName,
+ classOf[InternalRow].getName,
+ classOf[UnsafeRow].getName,
+ classOf[BinaryView].getName,
+ classOf[UTF8String].getName,
+ classOf[Decimal].getName,
+ classOf[CalendarInterval].getName,
+ classOf[TimestampNanosVal].getName,
+ classOf[VariantVal].getName,
+ classOf[ArrayData].getName,
+ classOf[UnsafeArrayData].getName,
+ classOf[MapData].getName,
+ classOf[UnsafeMapData].getName,
+ classOf[Expression].getName,
+ classOf[TaskContext].getName,
+ classOf[TaskKilledException].getName,
+ classOf[InputMetrics].getName,
+ classOf[CollationAwareUTF8String].getName,
+ classOf[CollationFactory].getName,
+ classOf[CollationSupport].getName,
+ QueryExecutionErrors.getClass.getName.stripSuffix("$")
+ )
+
+ /**
+ * FQN of the generated class. Must NOT be under the `codegen` package or
Janino
+ * fails with `java.lang.InstantiationException`. The same name is used for
both
+ * backends so generated source, logs, and diagnostics name the same class
+ * whichever backend compiles it. (Compiled results are NOT shared across
+ * backends: the compile cache key includes the backend.)
+ */
+ val GeneratedClassName: String =
+ "org.apache.spark.sql.catalyst.expressions.GeneratedClass"
+
+ def active(): CodeCompiler = active(null)
+
+ /**
+ * Resolve the active backend for the given generated unit.
+ *
+ * The configured backend ([[SQLConf.CODEGEN_COMPILER]]) governs ordinary
codegen. The
+ * exception is codegen the JDK compiler is fundamentally *incapable* of
compiling - not
+ * merely slower at - which is always routed to Janino regardless of the
configured
+ * backend. This is deterministic routing decided up front from the
execution context and
+ * the generated source; it is never a fallback after a failed compile. Two
such cases,
+ * both classes the JDK compiler cannot name that Janino's lenient
loader/lexer accepts:
+ *
+ * - REPL / interactive sessions (spark-shell `$line*` wrappers, Spark
Connect /
+ * Ammonite session artifacts): reachable only through a runtime class
loader and
+ * carrying self-inconsistent reflection metadata the JDK compiler
cannot resolve.
+ * This arm is context-wide by design: ALL codegen in such a session
routes to
+ * Janino, whether or not the unit references a REPL class, because the
reference
+ * cannot be told from the source text up front. See [[isReplContext]].
+ * - A reference to a class nested in a Scala `package object` (binary name
+ * `a.b.package$Inner`): `package` is a Java reserved word that cannot
be spelled as
+ * an identifier in any form - Java has no backtick/escape, unlike Scala
- so javac
+ * can neither parse `a.b.package.Inner` nor resolve the flat
`a.b.package$Inner`.
+ * See [[requiresJaninoSource]].
+ */
+ def active(code: CodeAndComment): CodeCompiler = {
+ val requested = SQLConf.get.codegenCompiler
+ if (requested != JANINO && isReplContext) {
+ logReplRoutingOnce()
+ JaninoCodeCompiler
+ } else if (requested != JANINO && requiresJaninoSource(code)) {
+ logPackageObjectRoutingOnce()
+ JaninoCodeCompiler
+ } else {
+ forBackend(requested)
+ }
+ }
+
+ // One-time visibility for the deterministic routing above: an operator who
set
+ // `jdk` should be able to tell from the logs why Janino still shows up.
+ private val replRoutingLogged = new
java.util.concurrent.atomic.AtomicBoolean(false)
+ private def logReplRoutingOnce(): Unit = {
+ if (replRoutingLogged.compareAndSet(false, true)) {
+ logInfo(log"REPL / interactive session context detected; codegen is
routed to " +
+ log"Janino although ${MDC(LogKeys.CONFIG,
SQLConf.CODEGEN_COMPILER.key)} " +
+ log"requests another backend (the JDK compiler cannot resolve
REPL-defined " +
+ log"classes). This notice is logged once per JVM.")
+ }
+ }
+ private val packageObjectRoutingLogged = new
java.util.concurrent.atomic.AtomicBoolean(false)
+ private def logPackageObjectRoutingOnce(): Unit = {
+ if (packageObjectRoutingLogged.compareAndSet(false, true)) {
+ logInfo(log"Generated code references a Scala package-object class; that
unit is " +
+ log"routed to Janino although ${MDC(LogKeys.CONFIG,
SQLConf.CODEGEN_COMPILER.key)} " +
+ log"requests another backend (`package` is a Java reserved word the
JDK compiler " +
+ log"cannot name). This notice is logged once per JVM.")
+ }
+ }
+
+ // A `package` segment in a qualified/binary class name - a Scala `package
object`'s nested
+ // class such as `a.b.package$Inner`. `package` is a Java reserved word the
JDK compiler can
+ // name in no form (parse error as `package.Inner`; unresolvable as the flat
`package$Inner`),
+ // whereas Janino's lexer scans `package$Inner` as one identifier.
+ //
+ // `package` is the only keyword scanned for, by design. It is the only Java
keyword the
+ // Scala compiler ever produces in a generated name (from `package object`);
a class named
+ // after any other keyword (`class int`) requires pathological user code. It
is also the only
+ // keyword that is *safe* to scan for: the rest (`int`, `new`, `this`,
`return`, `switch`, ...)
+ // occur as legitimate tokens throughout the generated Java, so matching
them would route
+ // almost all codegen to Janino, whereas a `package` token never appears in
a generated class
+ // body except as such a class reference. (A fully general check would
inspect the resolved
+ // class names rather than the source text, but that information is only
available during
+ // compilation, i.e. after the backend is already chosen.) The lookbehind
keeps a legal
+ // identifier like `mypackage$Inner` from matching; a false positive (e.g.
text inside a string
+ // literal) is harmless - it only picks Janino, a superset of what javac
accepts.
+ private val UnnameablePackageObjectClass = """(?<![\w$])package[.$]""".r
+ private def requiresJaninoSource(code: CodeAndComment): Boolean = {
+ // This runs on every compile() call (the result is part of the cache
key), so gate
+ // the regex scan behind an intrinsified substring search: generated
bodies almost
+ // never contain the literal `package` at all, and the regex runs only
when they do.
+ code != null && code.body.contains("package") &&
+ UnnameablePackageObjectClass.findFirstIn(code.body).isDefined
+ }
+
+ private val ExecutorClassLoaderName =
"org.apache.spark.executor.ExecutorClassLoader"
+
+ /**
+ * True when codegen is running in a REPL / interactive context, detected
via the three
+ * mechanisms Spark uses to ship such classes:
+ * - the active job/session carries a REPL or artifact class-dir URI
+ * ([[JobArtifactSet.getCurrentJobArtifactState]]'s `replClassDirUri`).
This is the
+ * canonical signal: Spark Connect sets it per session and spark-shell
falls back to
+ * it from `spark.repl.class.uri`. It is a thread-local set around both
driver-side
+ * and executor-side work, so it catches driver-side codegen where no
+ * `ExecutorClassLoader` is in the loader chain (e.g. a Connect UDF over
a local
+ * relation referencing an Ammonite `$sess` class); or
+ * - `spark.repl.class.uri` set in the active conf (spark-shell sets this
globally); or
+ * - an [[org.apache.spark.executor.ExecutorClassLoader]] somewhere in the
active
+ * class loader chain (created on executors when a session has such a
class URI).
+ *
+ * The default (non-REPL) job state has no `replClassDirUri`, so ordinary
codegen is not
+ * affected. The class loader is compared by class name rather than
`isInstanceOf` so
+ * catalyst need not depend on the `repl` module. Any reflection / lookup
failure
+ * conservatively reports `false`, which preserves the configured backend.
+ */
+ private def isReplContext: Boolean = {
+ def hasArtifactReplUri =
+ try
JobArtifactSet.getCurrentJobArtifactState.exists(_.replClassDirUri.isDefined)
+ catch { case NonFatal(_) => false }
+ def confHasReplUri =
+ try Option(SparkEnv.get).exists(_.conf.contains("spark.repl.class.uri"))
+ catch { case NonFatal(_) => false }
+ def eclInChain =
+ try {
+ var loader = Utils.getContextOrSparkClassLoader
+ var found = false
+ while (loader != null && !found) {
+ if (loader.getClass.getName == ExecutorClassLoaderName) found = true
Review Comment:
`ExecutorClassLoader` lives in core (`org.apache.spark.executor`), which
catalyst already depends on, so the repl-module rationale in the comment above
is stale — this can be `loader.isInstanceOf[ExecutorClassLoader]`, which also
survives a rename. If the string compare is deliberate for another reason,
could you state that reason in the comment instead?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeCompiler.scala:
##########
@@ -0,0 +1,1281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream,
IOException, StringWriter}
+import java.net.{JarURLConnection, URI, URL}
+import java.util.Locale
+import java.util.concurrent.{Callable, ExecutionException, ExecutorService}
+import javax.tools.{Diagnostic, DiagnosticCollector, FileObject,
ForwardingJavaFileManager, JavaCompiler, JavaFileManager, JavaFileObject,
SimpleJavaFileObject, StandardJavaFileManager, StandardLocation, ToolProvider}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+import scala.util.control.NonFatal
+
+import com.google.common.cache.{Cache, CacheBuilder}
+import com.google.common.util.concurrent.Uninterruptibles
+import org.codehaus.commons.compiler.{CompileException,
InternalCompilerException}
+import org.codehaus.janino.ClassBodyEvaluator
+import org.codehaus.janino.util.ClassFile
+import org.codehaus.janino.util.ClassFile.CodeAttribute
+
+import org.apache.spark.{JobArtifactSet, SparkEnv, TaskContext,
TaskKilledException}
+import org.apache.spark.executor.InputMetrics
+import org.apache.spark.internal.{Logging, LogKeys}
+import org.apache.spark.metrics.source.CodegenMetrics
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Expression, UnsafeArrayData,
UnsafeMapData, UnsafeRow}
+import org.apache.spark.sql.catalyst.util.{ArrayData,
CollationAwareUTF8String, CollationFactory, CollationSupport, MapData}
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.Decimal
+import org.apache.spark.unsafe.Platform
+import org.apache.spark.unsafe.types.{BinaryView, CalendarInterval,
TimestampNanosVal, UTF8String, VariantVal}
+import org.apache.spark.util.{ParentClassLoader, ThreadUtils, Utils}
+
+/**
+ * Backend used to compile generator-produced Java source into a
[[GeneratedClass]].
+ *
+ * Two implementations are provided:
+ * - [[JaninoCodeCompiler]]: the default, uses Janino's
`ClassBodyEvaluator`. Very fast.
+ * - [[JdkCodeCompiler]]: uses `javax.tools.JavaCompiler` from the JDK.
Slower (~5x for
+ * large generated units, 30-300x for small ones), but maintained on the
JDK
+ * release cadence and not subject to Janino's unmaintained-upstream risk.
+ *
+ * The backend is selected at compile time via [[SQLConf.CODEGEN_COMPILER]].
+ */
+trait CodeCompiler {
+ /** Backend name as used in `spark.sql.codegen.compiler`. */
+ def name: String
+
+ /**
+ * Compile a generator-produced class body into an instance of the
+ * [[GeneratedClass]] subclass it defines.
+ *
+ * @return the instantiated generated class along with bytecode statistics.
+ */
+ def compile(code: CodeAndComment): (GeneratedClass, ByteCodeStats)
+}
+
+object CodeCompiler extends Logging {
+
+ // Emit log messages under CodeGenerator's logger name: operators and tests
+ // (SPARK-25113 / SPARK-51527) subscribe to that exact logger for codegen
+ // compilation events, and the backends are implementation details of
+ // `CodeGenerator.compile`, so their logs belong under its name.
+ override protected def logName: String = classOf[CodeGenerator[_, _]].getName
+
+ val JANINO: String = "janino"
+ val JDK: String = "jdk"
+
+ /**
+ * Fully-qualified imports made available to generated code by both backends.
+ *
+ * For Janino these are passed to `ClassBodyEvaluator.setDefaultImports`.
+ * For the JDK backend they are rendered into `import` statements inside the
+ * synthesized compilation unit.
+ *
+ * This is the single shared list - anything added here automatically
applies to
+ * both backends. It intentionally excludes
+ * `org.apache.spark.sql.catalyst.expressions.codegen.GeneratedClass` to
avoid
+ * a name collision with the generated subclass `GeneratedClass`; the extends
+ * clause uses the fully-qualified name instead.
+ *
+ * When adding an entry, keep its SIMPLE name distinct from any `import`
line a
+ * generator emits at the top of a class body (currently only
GenerateColumnAccessor
+ * does this): javac rejects two single-type imports sharing a simple name
+ * (JLS 7.5.1) while Janino resolves them leniently, so a collision would
fail only
+ * under the JDK backend.
+ */
+ val DefaultImports: Seq[String] = Seq(
+ classOf[Platform].getName,
+ classOf[InternalRow].getName,
+ classOf[UnsafeRow].getName,
+ classOf[BinaryView].getName,
+ classOf[UTF8String].getName,
+ classOf[Decimal].getName,
+ classOf[CalendarInterval].getName,
+ classOf[TimestampNanosVal].getName,
+ classOf[VariantVal].getName,
+ classOf[ArrayData].getName,
+ classOf[UnsafeArrayData].getName,
+ classOf[MapData].getName,
+ classOf[UnsafeMapData].getName,
+ classOf[Expression].getName,
+ classOf[TaskContext].getName,
+ classOf[TaskKilledException].getName,
+ classOf[InputMetrics].getName,
+ classOf[CollationAwareUTF8String].getName,
+ classOf[CollationFactory].getName,
+ classOf[CollationSupport].getName,
+ QueryExecutionErrors.getClass.getName.stripSuffix("$")
+ )
+
+ /**
+ * FQN of the generated class. Must NOT be under the `codegen` package or
Janino
+ * fails with `java.lang.InstantiationException`. The same name is used for
both
+ * backends so generated source, logs, and diagnostics name the same class
+ * whichever backend compiles it. (Compiled results are NOT shared across
+ * backends: the compile cache key includes the backend.)
+ */
+ val GeneratedClassName: String =
+ "org.apache.spark.sql.catalyst.expressions.GeneratedClass"
+
+ def active(): CodeCompiler = active(null)
+
+ /**
+ * Resolve the active backend for the given generated unit.
+ *
+ * The configured backend ([[SQLConf.CODEGEN_COMPILER]]) governs ordinary
codegen. The
+ * exception is codegen the JDK compiler is fundamentally *incapable* of
compiling - not
+ * merely slower at - which is always routed to Janino regardless of the
configured
+ * backend. This is deterministic routing decided up front from the
execution context and
+ * the generated source; it is never a fallback after a failed compile. Two
such cases,
+ * both classes the JDK compiler cannot name that Janino's lenient
loader/lexer accepts:
+ *
+ * - REPL / interactive sessions (spark-shell `$line*` wrappers, Spark
Connect /
+ * Ammonite session artifacts): reachable only through a runtime class
loader and
+ * carrying self-inconsistent reflection metadata the JDK compiler
cannot resolve.
+ * This arm is context-wide by design: ALL codegen in such a session
routes to
+ * Janino, whether or not the unit references a REPL class, because the
reference
+ * cannot be told from the source text up front. See [[isReplContext]].
+ * - A reference to a class nested in a Scala `package object` (binary name
+ * `a.b.package$Inner`): `package` is a Java reserved word that cannot
be spelled as
+ * an identifier in any form - Java has no backtick/escape, unlike Scala
- so javac
+ * can neither parse `a.b.package.Inner` nor resolve the flat
`a.b.package$Inner`.
+ * See [[requiresJaninoSource]].
+ */
+ def active(code: CodeAndComment): CodeCompiler = {
+ val requested = SQLConf.get.codegenCompiler
+ if (requested != JANINO && isReplContext) {
+ logReplRoutingOnce()
+ JaninoCodeCompiler
+ } else if (requested != JANINO && requiresJaninoSource(code)) {
+ logPackageObjectRoutingOnce()
+ JaninoCodeCompiler
+ } else {
+ forBackend(requested)
+ }
+ }
+
+ // One-time visibility for the deterministic routing above: an operator who
set
+ // `jdk` should be able to tell from the logs why Janino still shows up.
+ private val replRoutingLogged = new
java.util.concurrent.atomic.AtomicBoolean(false)
+ private def logReplRoutingOnce(): Unit = {
+ if (replRoutingLogged.compareAndSet(false, true)) {
+ logInfo(log"REPL / interactive session context detected; codegen is
routed to " +
+ log"Janino although ${MDC(LogKeys.CONFIG,
SQLConf.CODEGEN_COMPILER.key)} " +
+ log"requests another backend (the JDK compiler cannot resolve
REPL-defined " +
+ log"classes). This notice is logged once per JVM.")
+ }
+ }
+ private val packageObjectRoutingLogged = new
java.util.concurrent.atomic.AtomicBoolean(false)
+ private def logPackageObjectRoutingOnce(): Unit = {
+ if (packageObjectRoutingLogged.compareAndSet(false, true)) {
+ logInfo(log"Generated code references a Scala package-object class; that
unit is " +
+ log"routed to Janino although ${MDC(LogKeys.CONFIG,
SQLConf.CODEGEN_COMPILER.key)} " +
+ log"requests another backend (`package` is a Java reserved word the
JDK compiler " +
+ log"cannot name). This notice is logged once per JVM.")
+ }
+ }
+
+ // A `package` segment in a qualified/binary class name - a Scala `package
object`'s nested
+ // class such as `a.b.package$Inner`. `package` is a Java reserved word the
JDK compiler can
+ // name in no form (parse error as `package.Inner`; unresolvable as the flat
`package$Inner`),
+ // whereas Janino's lexer scans `package$Inner` as one identifier.
+ //
+ // `package` is the only keyword scanned for, by design. It is the only Java
keyword the
+ // Scala compiler ever produces in a generated name (from `package object`);
a class named
+ // after any other keyword (`class int`) requires pathological user code. It
is also the only
+ // keyword that is *safe* to scan for: the rest (`int`, `new`, `this`,
`return`, `switch`, ...)
+ // occur as legitimate tokens throughout the generated Java, so matching
them would route
+ // almost all codegen to Janino, whereas a `package` token never appears in
a generated class
+ // body except as such a class reference. (A fully general check would
inspect the resolved
+ // class names rather than the source text, but that information is only
available during
+ // compilation, i.e. after the backend is already chosen.) The lookbehind
keeps a legal
+ // identifier like `mypackage$Inner` from matching; a false positive (e.g.
text inside a string
+ // literal) is harmless - it only picks Janino, a superset of what javac
accepts.
+ private val UnnameablePackageObjectClass = """(?<![\w$])package[.$]""".r
+ private def requiresJaninoSource(code: CodeAndComment): Boolean = {
+ // This runs on every compile() call (the result is part of the cache
key), so gate
+ // the regex scan behind an intrinsified substring search: generated
bodies almost
+ // never contain the literal `package` at all, and the regex runs only
when they do.
+ code != null && code.body.contains("package") &&
+ UnnameablePackageObjectClass.findFirstIn(code.body).isDefined
+ }
+
+ private val ExecutorClassLoaderName =
"org.apache.spark.executor.ExecutorClassLoader"
+
+ /**
+ * True when codegen is running in a REPL / interactive context, detected
via the three
+ * mechanisms Spark uses to ship such classes:
+ * - the active job/session carries a REPL or artifact class-dir URI
+ * ([[JobArtifactSet.getCurrentJobArtifactState]]'s `replClassDirUri`).
This is the
+ * canonical signal: Spark Connect sets it per session and spark-shell
falls back to
+ * it from `spark.repl.class.uri`. It is a thread-local set around both
driver-side
+ * and executor-side work, so it catches driver-side codegen where no
+ * `ExecutorClassLoader` is in the loader chain (e.g. a Connect UDF over
a local
+ * relation referencing an Ammonite `$sess` class); or
+ * - `spark.repl.class.uri` set in the active conf (spark-shell sets this
globally); or
+ * - an [[org.apache.spark.executor.ExecutorClassLoader]] somewhere in the
active
+ * class loader chain (created on executors when a session has such a
class URI).
+ *
+ * The default (non-REPL) job state has no `replClassDirUri`, so ordinary
codegen is not
+ * affected. The class loader is compared by class name rather than
`isInstanceOf` so
+ * catalyst need not depend on the `repl` module. Any reflection / lookup
failure
+ * conservatively reports `false`, which preserves the configured backend.
+ */
+ private def isReplContext: Boolean = {
+ def hasArtifactReplUri =
+ try
JobArtifactSet.getCurrentJobArtifactState.exists(_.replClassDirUri.isDefined)
+ catch { case NonFatal(_) => false }
+ def confHasReplUri =
+ try Option(SparkEnv.get).exists(_.conf.contains("spark.repl.class.uri"))
+ catch { case NonFatal(_) => false }
+ def eclInChain =
+ try {
+ var loader = Utils.getContextOrSparkClassLoader
+ var found = false
+ while (loader != null && !found) {
+ if (loader.getClass.getName == ExecutorClassLoaderName) found = true
+ loader = loader.getParent
+ }
+ found
+ } catch {
+ case NonFatal(_) => false
+ }
+ hasArtifactReplUri || confHasReplUri || eclInChain
+ }
+
+ /**
+ * Get the backend by name. SQLConf already validates the value via
`checkValues`
+ * at config-set time, so unknown names should not reach here in normal use;
+ * tests may call this directly. When `jdk` is requested but the JDK
compiler is
+ * not present at runtime (a JRE-only image), this logs a warning once and
falls
+ * back to Janino so the query does not fail.
+ */
+ private[codegen] def forBackend(requested: String): CodeCompiler = {
+ requested.toLowerCase(Locale.ROOT) match {
+ case JANINO => JaninoCodeCompiler
+ case JDK if JdkCodeCompiler.isAvailable => JdkCodeCompiler
+ case JDK =>
+ logJdkUnavailableOnce()
+ JaninoCodeCompiler
+ case other =>
+ throw new IllegalArgumentException(
+ s"Unknown ${SQLConf.CODEGEN_COMPILER.key} backend: $other " +
+ s"(supported: ${Seq(JANINO, JDK).mkString(", ")})")
+ }
+ }
+
+ private val jdkUnavailableWarned = new
java.util.concurrent.atomic.AtomicBoolean(false)
+ private def logJdkUnavailableOnce(): Unit = {
+ if (jdkUnavailableWarned.compareAndSet(false, true)) {
+ logWarning(log"${MDC(LogKeys.CONFIG, SQLConf.CODEGEN_COMPILER.key)}=jdk
requested " +
+ log"but javax.tools.JavaCompiler is not available on this runtime " +
+ log"(JRE-only image?). Falling back to Janino for this JVM.")
+ }
+ }
+
+ /**
+ * Compute bytecode statistics for a set of compiled classes. Both backends
+ * produce the same map shape (className -> classfile bytes), so the
analysis is
+ * shared. This is the only piece of code that depends on Janino's
+ * `commons-compiler` ClassFile parser; it can be swapped for ASM later
without
Review Comment:
`org.codehaus.janino.util.ClassFile` ships in the `janino` artifact;
`commons-compiler` contains no `util/ClassFile`.
```suggestion
* `ClassFile` parser (from the `janino` artifact); it can be swapped for
ASM later without
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]