This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new e3b031276e4 [SPARK-44634][SQL] Encoders.bean does no longer support nested beans with type arguments e3b031276e4 is described below commit e3b031276e4ab626f7db7b8d95f01a598e25a6b1 Author: Giambattista Bloisi <giambattista.blo...@openaire.eu> AuthorDate: Sun Aug 6 21:47:57 2023 +0200 [SPARK-44634][SQL] Encoders.bean does no longer support nested beans with type arguments ### What changes were proposed in this pull request? This PR fixes a regression introduced in Spark 3.4.x where Encoders.bean is no longer able to process nested beans having type arguments. For example: ``` class A<T> { T value; // value getter and setter } class B { A<String> stringHolder; // stringHolder getter and setter } Encoders.bean(B.class); // throws "SparkUnsupportedOperationException: [ENCODER_NOT_FOUND]..." ``` ### Why are the changes needed? JavaTypeInference.encoderFor main match does not manage ParameterizedType and TypeVariable cases. I think this is a regression introduced after getting rid of usage of guava TypeToken: [SPARK-42093 SQL Move JavaTypeInference to AgnosticEncoders](https://github.com/apache/spark/commit/18672003513d5a4aa610b6b94dbbc15c33185d3#diff-1191737b908340a2f4c22b71b1c40ebaa0da9d8b40c958089c346a3bda26943b) hvanhovell cloud-fan In this PR I'm leveraging commons lang3 TypeUtils functionalities to solve ParameterizedType type arguments for classes ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests have been extended to check correct encoding of a nested bean having type arguments. Closes #42327 from gbloisi-openaire/spark-44634. Authored-by: Giambattista Bloisi <giambattista.blo...@openaire.eu> Signed-off-by: Herman van Hovell <her...@databricks.com> (cherry picked from commit d6998979427b6ad3a0f16d6966b3927d40440a60) Signed-off-by: Herman van Hovell <her...@databricks.com> --- .../spark/sql/catalyst/JavaTypeInference.scala | 84 +++++----------------- .../spark/sql/catalyst/JavaBeanWithGenerics.java | 41 +++++++++++ .../sql/catalyst/JavaTypeInferenceSuite.scala | 4 ++ 3 files changed, 64 insertions(+), 65 deletions(-) diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala index f352d28a7b5..3d536b735db 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala @@ -18,12 +18,14 @@ package org.apache.spark.sql.catalyst import java.beans.{Introspector, PropertyDescriptor} import java.lang.reflect.{ParameterizedType, Type, TypeVariable} -import java.util.{ArrayDeque, List => JList, Map => JMap} +import java.util.{List => JList, Map => JMap} import javax.annotation.Nonnull -import scala.annotation.tailrec +import scala.collection.JavaConverters._ import scala.reflect.ClassTag +import org.apache.commons.lang3.reflect.{TypeUtils => JavaTypeUtils} + import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, BinaryEncoder, BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, BoxedIntEncoder, BoxedLongEncoder, BoxedShortEncoder, DayTimeIntervalEncoder, DEFAULT_JAVA_DECIMAL_ENCODER, EncoderField, IterableEncoder, JavaBeanEncoder, JavaBigIntEncoder, JavaEnumEncoder, LocalDateTimeEncoder, MapEncoder, PrimitiveBooleanEncoder, PrimitiveByteEncoder, PrimitiveDoubleEncoder, PrimitiveFloatEncoder, P [...] import org.apache.spark.sql.errors.ExecutionErrors @@ -57,7 +59,8 @@ object JavaTypeInference { encoderFor(beanType, Set.empty).asInstanceOf[AgnosticEncoder[T]] } - private def encoderFor(t: Type, seenTypeSet: Set[Class[_]]): AgnosticEncoder[_] = t match { + private def encoderFor(t: Type, seenTypeSet: Set[Class[_]], + typeVariables: Map[TypeVariable[_], Type] = Map.empty): AgnosticEncoder[_] = t match { case c: Class[_] if c == java.lang.Boolean.TYPE => PrimitiveBooleanEncoder case c: Class[_] if c == java.lang.Byte.TYPE => PrimitiveByteEncoder @@ -101,18 +104,24 @@ object JavaTypeInference { UDTEncoder(udt, udt.getClass) case c: Class[_] if c.isArray => - val elementEncoder = encoderFor(c.getComponentType, seenTypeSet) + val elementEncoder = encoderFor(c.getComponentType, seenTypeSet, typeVariables) ArrayEncoder(elementEncoder, elementEncoder.nullable) - case ImplementsList(c, Array(elementCls)) => - val element = encoderFor(elementCls, seenTypeSet) + case c: Class[_] if classOf[JList[_]].isAssignableFrom(c) => + val element = encoderFor(c.getTypeParameters.array(0), seenTypeSet, typeVariables) IterableEncoder(ClassTag(c), element, element.nullable, lenientSerialization = false) - case ImplementsMap(c, Array(keyCls, valueCls)) => - val keyEncoder = encoderFor(keyCls, seenTypeSet) - val valueEncoder = encoderFor(valueCls, seenTypeSet) + case c: Class[_] if classOf[JMap[_, _]].isAssignableFrom(c) => + val keyEncoder = encoderFor(c.getTypeParameters.array(0), seenTypeSet, typeVariables) + val valueEncoder = encoderFor(c.getTypeParameters.array(1), seenTypeSet, typeVariables) MapEncoder(ClassTag(c), keyEncoder, valueEncoder, valueEncoder.nullable) + case tv: TypeVariable[_] => + encoderFor(typeVariables(tv), seenTypeSet, typeVariables) + + case pt: ParameterizedType => + encoderFor(pt.getRawType, seenTypeSet, JavaTypeUtils.getTypeArguments(pt).asScala.toMap) + case c: Class[_] => if (seenTypeSet.contains(c)) { throw ExecutionErrors.cannotHaveCircularReferencesInBeanClassError(c) @@ -124,7 +133,7 @@ object JavaTypeInference { // Note that the fields are ordered by name. val fields = properties.map { property => val readMethod = property.getReadMethod - val encoder = encoderFor(readMethod.getGenericReturnType, seenTypeSet + c) + val encoder = encoderFor(readMethod.getGenericReturnType, seenTypeSet + c, typeVariables) // The existence of `javax.annotation.Nonnull`, means this field is not nullable. val hasNonNull = readMethod.isAnnotationPresent(classOf[Nonnull]) EncoderField( @@ -147,59 +156,4 @@ object JavaTypeInference { .filterNot(_.getName == "declaringClass") .filter(_.getReadMethod != null) } - - private class ImplementsGenericInterface(interface: Class[_]) { - assert(interface.isInterface) - assert(interface.getTypeParameters.nonEmpty) - - def unapply(t: Type): Option[(Class[_], Array[Type])] = implementsInterface(t).map { cls => - cls -> findTypeArgumentsForInterface(t) - } - - @tailrec - private def implementsInterface(t: Type): Option[Class[_]] = t match { - case pt: ParameterizedType => implementsInterface(pt.getRawType) - case c: Class[_] if interface.isAssignableFrom(c) => Option(c) - case _ => None - } - - private def findTypeArgumentsForInterface(t: Type): Array[Type] = { - val queue = new ArrayDeque[(Type, Map[Any, Type])] - queue.add(t -> Map.empty) - while (!queue.isEmpty) { - queue.poll() match { - case (pt: ParameterizedType, bindings) => - // translate mappings... - val mappedTypeArguments = pt.getActualTypeArguments.map { - case v: TypeVariable[_] => bindings(v.getName) - case v => v - } - if (pt.getRawType == interface) { - return mappedTypeArguments - } else { - val mappedTypeArgumentMap = mappedTypeArguments - .zipWithIndex.map(_.swap) - .toMap[Any, Type] - queue.add(pt.getRawType -> mappedTypeArgumentMap) - } - case (c: Class[_], indexedBindings) => - val namedBindings = c.getTypeParameters.zipWithIndex.map { - case (parameter, index) => - parameter.getName -> indexedBindings(index) - }.toMap[Any, Type] - val superClass = c.getGenericSuperclass - if (superClass != null) { - queue.add(superClass -> namedBindings) - } - c.getGenericInterfaces.foreach { iface => - queue.add(iface -> namedBindings) - } - } - } - throw ExecutionErrors.unreachableError() - } - } - - private object ImplementsList extends ImplementsGenericInterface(classOf[JList[_]]) - private object ImplementsMap extends ImplementsGenericInterface(classOf[JMap[_, _]]) } diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/JavaBeanWithGenerics.java b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/JavaBeanWithGenerics.java new file mode 100644 index 00000000000..b84a3122cf8 --- /dev/null +++ b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/JavaBeanWithGenerics.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst; + +class JavaBeanWithGenerics<T,A> { + private A attribute; + + private T value; + + public A getAttribute() { + return attribute; + } + + public void setAttribute(A attribute) { + this.attribute = attribute; + } + + public T getValue() { + return value; + } + + public void setValue(T value) { + this.value = value; + } +} + diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/JavaTypeInferenceSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/JavaTypeInferenceSuite.scala index 35f5bf739bf..64399976097 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/JavaTypeInferenceSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/JavaTypeInferenceSuite.scala @@ -66,6 +66,7 @@ class LeafBean { @BeanProperty var period: java.time.Period = _ @BeanProperty var enum: java.time.Month = _ @BeanProperty val readOnlyString = "read-only" + @BeanProperty var genericNestedBean: JavaBeanWithGenerics[String, String] = _ var nonNullString: String = "value" @javax.annotation.Nonnull @@ -184,6 +185,9 @@ class JavaTypeInferenceSuite extends SparkFunSuite { encoderField("date", STRICT_DATE_ENCODER), encoderField("duration", DayTimeIntervalEncoder), encoderField("enum", JavaEnumEncoder(classTag[java.time.Month])), + encoderField("genericNestedBean", JavaBeanEncoder( + ClassTag(classOf[JavaBeanWithGenerics[String, String]]), + Seq(encoderField("attribute", StringEncoder), encoderField("value", StringEncoder)))), encoderField("instant", STRICT_INSTANT_ENCODER), encoderField("localDate", STRICT_LOCAL_DATE_ENCODER), encoderField("localDateTime", LocalDateTimeEncoder), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org