hvanhovell commented on code in PR #48158:
URL: https://github.com/apache/spark/pull/48158#discussion_r1771684833
##########
sql/api/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala:
##########
@@ -117,65 +130,144 @@ object JavaTypeInference {
.asInstanceOf[UserDefinedType[Any]]
UDTEncoder(udt, udt.getClass)
- case c: Class[_] if c.isArray =>
+ case c: Class[_] if !forGenericBound && c.isArray =>
val elementEncoder = encoderFor(c.getComponentType, seenTypeSet,
typeVariables)
ArrayEncoder(elementEncoder, elementEncoder.nullable)
- case c: Class[_] if classOf[JList[_]].isAssignableFrom(c) =>
+ case c: Class[_] if !forGenericBound &&
classOf[JList[_]].isAssignableFrom(c) =>
val element = encoderFor(c.getTypeParameters.array(0), seenTypeSet,
typeVariables)
IterableEncoder(ClassTag(c), element, element.nullable,
lenientSerialization = false)
- case c: Class[_] if classOf[JSet[_]].isAssignableFrom(c) =>
+ case c: Class[_] if !forGenericBound &&
classOf[JSet[_]].isAssignableFrom(c) =>
val element = encoderFor(c.getTypeParameters.array(0), seenTypeSet,
typeVariables)
IterableEncoder(ClassTag(c), element, element.nullable,
lenientSerialization = false)
- case c: Class[_] if classOf[JMap[_, _]].isAssignableFrom(c) =>
+ case c: Class[_] if !forGenericBound && classOf[JMap[_,
_]].isAssignableFrom(c) =>
val keyEncoder = encoderFor(c.getTypeParameters.array(0), seenTypeSet,
typeVariables)
val valueEncoder = encoderFor(c.getTypeParameters.array(1), seenTypeSet,
typeVariables)
MapEncoder(ClassTag(c), keyEncoder, valueEncoder, valueEncoder.nullable)
case tv: TypeVariable[_] =>
- encoderFor(typeVariables(tv), seenTypeSet, typeVariables)
+ typeVariables.get(tv) match {
+ case Some(knownType) => encoderFor(knownType, seenTypeSet,
typeVariables)
+
+ case None =>
+ // Concrete type unknown, check if the bounds can be used to
identify a
+ // UDTEncoder and if exception is thrown check if the type extends
Serializable
+ val concreteBound = tv.getBounds.collectFirst { case cls: Class[_]
=> cls }
+ tv.getBounds
+ .flatMap(getAllSuperClasses)
+ .foldLeft[(Option[AgnosticEncoder[_]],
Option[AgnosticEncoder[_]])](None -> None) {
+ case (r @ (Some(_), _), _) => r
- case pt: ParameterizedType =>
+ case (r @ (None, serEncoder), bound) =>
+ Try(encoderFor(bound, seenTypeSet, typeVariables,
forGenericBound = true)) match {
+ case Success(value) => (Option(value), serEncoder)
+
+ case Failure(UseSerializationEncoder(encoderProvider)) if
serEncoder.isEmpty =>
+ None ->
Option(encoderProvider(concreteBound.getOrElse(bound)))
+
+ case Failure(_) => r
+ }
+ } match {
+ case (Some(encd), _) => encd
+
+ case (None, Some(encd)) => encd
+
+ case _ => throw
ExecutionErrors.cannotFindEncoderForTypeError(t.getTypeName)
+ }
+ }
+
+ case pt: ParameterizedType if !forGenericBound =>
encoderFor(pt.getRawType, seenTypeSet,
JavaTypeUtils.getTypeArguments(pt).asScala.toMap)
- case c: Class[_] =>
+ case c: Class[_] if !forGenericBound =>
if (seenTypeSet.contains(c)) {
throw ExecutionErrors.cannotHaveCircularReferencesInBeanClassError(c)
}
- // TODO: we should only collect properties that have getter and setter.
However, some tests
- // pass in scala case class as java bean class which doesn't have
getter and setter.
+ // TODO: we should only collect properties that have getter and setter.
However, some
+ // tests pass in scala case class as java bean class which doesn't have
getter and
+ // setter.
val properties = getJavaBeanReadableProperties(c)
- // add type variables from inheritance hierarchy of the class
- val classTV = JavaTypeUtils.getTypeArguments(c,
classOf[Object]).asScala.toMap ++
- typeVariables
- // Note that the fields are ordered by name.
- val fields = properties.map { property =>
- val readMethod = property.getReadMethod
- val encoder = encoderFor(readMethod.getGenericReturnType, seenTypeSet
+ c, classTV)
- // The existence of `javax.annotation.Nonnull`, means this field is
not nullable.
- val hasNonNull = readMethod.isAnnotationPresent(classOf[Nonnull])
- EncoderField(
- property.getName,
- encoder,
- encoder.nullable && !hasNonNull,
- Metadata.empty,
- Option(readMethod.getName),
- Option(property.getWriteMethod).map(_.getName))
+ // if the properties is empty and this is not a top level enclosing
class, then we
+ // should not consider class as bean, as otherwise it will be treated as
empty schema
+ // and loose the data on deser.
+ if (properties.isEmpty && seenTypeSet.nonEmpty) {
+ if (classOf[KryoSerializable].isAssignableFrom(c)) {
Review Comment:
This will be an issue for Connect. While the API supports Kryo, Connect
can't support Kryo in its current form. Either we have detect whether we are in
connect mode, or we have to just fall back to java serialization.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]