Hi Ana,

I think you will need to deal with ClassTag to keep all the code generic.
I've found such example which should help:

https://github.com/amzn/milan/blob/7dfa29b434ced7eef286ea34c5085c10c1b787b6/milan/milan-compilers/milan-flink-compiler/src/main/scala/com/amazon/milan/compiler/flink/serialization/JsonDeserializationSchema.scala

object JsonDeserializationSchema {
  private val objectMapper =
JsonMapper.builder().addModule(DefaultScalaModule).build()}


class JsonDeserializationSchema[T: ClassTag] extends DeserializationSchema[T
] {
override def deserialize(bytes: Array[Byte]): T = JsonDeserializationSchema.
objectMapper.readValue[T](bytes, classTag[T].runtimeClass.asInstanceOf[Class
[T]]) override def getProducedType: TypeInformation[T] = TypeExtractor.
getForClass(classTag[T].runtimeClass.asInstanceOf[Class[T]])

  ...


}

-----------

Alexey

On Mon, Mar 6, 2023 at 9:58 PM Ana Gómez González <angog...@gmail.com>
wrote:

>
> Hello!
>
> First time emailing one doubt to this mailing list, hope I'm not messing
> anything up.
> I'm not fully sure if what I want to do it's conceptually correct, so pls
> let me know.
>
> I want to create a generic class that extends a DeserializationSchema. I
> want an easy way of creating different deserialization schemas for my
> rabbitMQ sources from JSON to scala case classes.
>
> My first approach looks like this:
>
> import com.fasterxml.jackson.databind.json.JsonMapper
> import com.fasterxml.jackson.module.scala.DefaultScalaModule
> import org.apache.flink.api.common.serialization.DeserializationSchema
> import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
>
> class GenericJsonSchema[T] extends DeserializationSchema[T] {
>
> private val typeInformation: TypeInformation[T] = TypeInformation.of(new
> TypeHint[T] {})
> private val objectMapper: JsonMapper = JsonMapper.builder()
> .addModule(DefaultScalaModule)
> .build()
>
> @throws(classOf[IOException])
> def deserialize(message: Array[Byte]): T = objectMapper.readValue(message,
> typeInformation.getTypeClass)
>
> def isEndOfStream(nextElement: T): Boolean = false
>
> def getProducedType: TypeInformation[T] = typeInformation
> }
>
>
> When running I obtain:
>
>
> *Exception in thread "main" org.apache.flink.util.FlinkRuntimeException:
> The TypeHint is using a generic variable.This is not supported, generic
> types must be fully specified for the TypeHint.*
>
> I've read and tried to understand all the problems when using generic
> types and TypeInformation class, but I don't get the correct use or if it
> can be used for my purpose.
>
>
> Thanks a lot in advance
>
>
> *Ana Gómez González*
> <http://twitter.com/angoglez> <https://www.linkedin.com/in/angoglez/>
>

Reply via email to