ahshahid commented on code in PR #48158:
URL: https://github.com/apache/spark/pull/48158#discussion_r1771708744


##########
sql/api/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala:
##########
@@ -117,65 +130,144 @@ object JavaTypeInference {
         .asInstanceOf[UserDefinedType[Any]]
       UDTEncoder(udt, udt.getClass)
 
-    case c: Class[_] if c.isArray =>
+    case c: Class[_] if !forGenericBound && c.isArray =>
       val elementEncoder = encoderFor(c.getComponentType, seenTypeSet, 
typeVariables)
       ArrayEncoder(elementEncoder, elementEncoder.nullable)
 
-    case c: Class[_] if classOf[JList[_]].isAssignableFrom(c) =>
+    case c: Class[_] if !forGenericBound && 
classOf[JList[_]].isAssignableFrom(c) =>
       val element = encoderFor(c.getTypeParameters.array(0), seenTypeSet, 
typeVariables)
       IterableEncoder(ClassTag(c), element, element.nullable, 
lenientSerialization = false)
 
-    case c: Class[_] if classOf[JSet[_]].isAssignableFrom(c) =>
+    case c: Class[_] if !forGenericBound && 
classOf[JSet[_]].isAssignableFrom(c) =>
       val element = encoderFor(c.getTypeParameters.array(0), seenTypeSet, 
typeVariables)
       IterableEncoder(ClassTag(c), element, element.nullable, 
lenientSerialization = false)
 
-    case c: Class[_] if classOf[JMap[_, _]].isAssignableFrom(c) =>
+    case c: Class[_] if !forGenericBound && classOf[JMap[_, 
_]].isAssignableFrom(c) =>
       val keyEncoder = encoderFor(c.getTypeParameters.array(0), seenTypeSet, 
typeVariables)
       val valueEncoder = encoderFor(c.getTypeParameters.array(1), seenTypeSet, 
typeVariables)
       MapEncoder(ClassTag(c), keyEncoder, valueEncoder, valueEncoder.nullable)
 
     case tv: TypeVariable[_] =>
-      encoderFor(typeVariables(tv), seenTypeSet, typeVariables)
+      typeVariables.get(tv) match {
+        case Some(knownType) => encoderFor(knownType, seenTypeSet, 
typeVariables)
+
+        case None =>
+          // Concrete type unknown, check if the bounds can be used to 
identify a
+          // UDTEncoder and if exception is thrown check if the type extends 
Serializable
+          val concreteBound = tv.getBounds.collectFirst { case cls: Class[_] 
=> cls }
+          tv.getBounds
+            .flatMap(getAllSuperClasses)
+            .foldLeft[(Option[AgnosticEncoder[_]], 
Option[AgnosticEncoder[_]])](None -> None) {
+              case (r @ (Some(_), _), _) => r
 
-    case pt: ParameterizedType =>
+              case (r @ (None, serEncoder), bound) =>
+                Try(encoderFor(bound, seenTypeSet, typeVariables, 
forGenericBound = true)) match {
+                  case Success(value) => (Option(value), serEncoder)
+
+                  case Failure(UseSerializationEncoder(encoderProvider)) if 
serEncoder.isEmpty =>
+                    None -> 
Option(encoderProvider(concreteBound.getOrElse(bound)))
+
+                  case Failure(_) => r
+                }
+            } match {
+            case (Some(encd), _) => encd
+
+            case (None, Some(encd)) => encd
+
+            case _ => throw 
ExecutionErrors.cannotFindEncoderForTypeError(t.getTypeName)
+          }
+      }
+
+    case pt: ParameterizedType if !forGenericBound =>
       encoderFor(pt.getRawType, seenTypeSet, 
JavaTypeUtils.getTypeArguments(pt).asScala.toMap)
 
-    case c: Class[_] =>
+    case c: Class[_] if !forGenericBound =>
       if (seenTypeSet.contains(c)) {
         throw ExecutionErrors.cannotHaveCircularReferencesInBeanClassError(c)
       }
 
-      // TODO: we should only collect properties that have getter and setter. 
However, some tests
-      //   pass in scala case class as java bean class which doesn't have 
getter and setter.
+      // TODO: we should only collect properties that have getter and setter. 
However, some
+      //  tests pass in scala case class as java bean class which doesn't have 
getter and
+      //  setter.
       val properties = getJavaBeanReadableProperties(c)
-      // add type variables from inheritance hierarchy of the class
-      val classTV = JavaTypeUtils.getTypeArguments(c, 
classOf[Object]).asScala.toMap ++
-        typeVariables
-      // Note that the fields are ordered by name.
-      val fields = properties.map { property =>
-        val readMethod = property.getReadMethod
-        val encoder = encoderFor(readMethod.getGenericReturnType, seenTypeSet 
+ c, classTV)
-        // The existence of `javax.annotation.Nonnull`, means this field is 
not nullable.
-        val hasNonNull = readMethod.isAnnotationPresent(classOf[Nonnull])
-        EncoderField(
-          property.getName,
-          encoder,
-          encoder.nullable && !hasNonNull,
-          Metadata.empty,
-          Option(readMethod.getName),
-          Option(property.getWriteMethod).map(_.getName))
+      // if the properties is empty and this is not a top level enclosing 
class, then we
+      // should not consider class as bean, as otherwise it will be treated as 
empty schema
+      // and loose the data on deser.
+      if (properties.isEmpty && seenTypeSet.nonEmpty) {
+        if (classOf[KryoSerializable].isAssignableFrom(c)) {
+          Encoders.kryo(c).asInstanceOf[AgnosticEncoder[_]]
+        } else if (classOf[java.io.Serializable].isAssignableFrom(c)) {
+          Encoders.javaSerialization(c).asInstanceOf[AgnosticEncoder[_]]
+        } else {
+          throw ExecutionErrors.cannotFindEncoderForTypeError(t.getTypeName)
+        }
+      } else {
+        // add type variables from inheritance hierarchy of the class
+        val parentClassesTypeMap =
+          JavaTypeUtils.getTypeArguments(c, classOf[Object]).asScala.toMap
+        val classTV = parentClassesTypeMap ++ typeVariables
+        // Note that the fields are ordered by name.
+        val fields = properties.map { property =>
+          val readMethod = property.getReadMethod
+          val methodReturnType = readMethod.getGenericReturnType
+          val encoder = encoderFor(methodReturnType, seenTypeSet + c, classTV)
+          // The existence of `javax.annotation.Nonnull`, means this field is 
not nullable.
+          val hasNonNull = readMethod.isAnnotationPresent(classOf[Nonnull])
+          EncoderField(
+            property.getName,
+            encoder,
+            encoder.nullable && !hasNonNull,
+            Metadata.empty,
+            Option(readMethod.getName),
+            Option(property.getWriteMethod).map(_.getName))
+        }
+        // implies it cannot be assumed a BeanClass.
+        // Check if its super class or interface could be represented by an 
Encoder
+
+        JavaBeanEncoder(ClassTag(c), fields.toImmutableArraySeq)
       }
-      JavaBeanEncoder(ClassTag(c), fields.toImmutableArraySeq)
 
-    case _ =>
-      throw ExecutionErrors.cannotFindEncoderForTypeError(t.toString)
+    case t =>
+      throw ExecutionErrors.cannotFindEncoderForTypeError(t.getTypeName)
   }
 
+  private def getAllSuperClasses(typee: Type): Array[Class[_]] = Option(typee)
+    .map({
+      case clazz: Class[_] =>
+        val currentInterfaces = clazz.getInterfaces
+        currentInterfaces ++ (clazz.getSuperclass +: 
currentInterfaces).flatMap(clz =>
+          getAllSuperClasses(clz)) :+ clazz
+
+      case tv: TypeVariable[_] =>
+        tv.getBounds.foldLeft(Array.empty[Class[_]]) { case (res, bnd) =>
+          res ++ getAllSuperClasses(bnd)
+        }
+    })
+    .getOrElse(Array.empty)
+
   def getJavaBeanReadableProperties(beanClass: Class[_]): 
Array[PropertyDescriptor] = {
     val beanInfo = Introspector.getBeanInfo(beanClass)
     beanInfo.getPropertyDescriptors
       .filterNot(_.getName == "class")
       .filterNot(_.getName == "declaringClass")
       .filter(_.getReadMethod != null)
   }
+
+  object UseSerializationEncoder {
+    def unapply(th: Throwable): Option[Class[_] => AgnosticEncoder[_]] = th 
match {

Review Comment:
   Unifying it will other java serialziation code, is something which I intend 
to do in my refactoring..
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to