Hi Ana, I think you will need to deal with ClassTag to keep all the code generic. I've found such example which should help:
https://github.com/amzn/milan/blob/7dfa29b434ced7eef286ea34c5085c10c1b787b6/milan/milan-compilers/milan-flink-compiler/src/main/scala/com/amazon/milan/compiler/flink/serialization/JsonDeserializationSchema.scala object JsonDeserializationSchema { private val objectMapper = JsonMapper.builder().addModule(DefaultScalaModule).build()} class JsonDeserializationSchema[T: ClassTag] extends DeserializationSchema[T ] { override def deserialize(bytes: Array[Byte]): T = JsonDeserializationSchema. objectMapper.readValue[T](bytes, classTag[T].runtimeClass.asInstanceOf[Class [T]]) override def getProducedType: TypeInformation[T] = TypeExtractor. getForClass(classTag[T].runtimeClass.asInstanceOf[Class[T]]) ... } ----------- Alexey On Mon, Mar 6, 2023 at 9:58 PM Ana Gómez González <angog...@gmail.com> wrote: > > Hello! > > First time emailing one doubt to this mailing list, hope I'm not messing > anything up. > I'm not fully sure if what I want to do it's conceptually correct, so pls > let me know. > > I want to create a generic class that extends a DeserializationSchema. I > want an easy way of creating different deserialization schemas for my > rabbitMQ sources from JSON to scala case classes. > > My first approach looks like this: > > import com.fasterxml.jackson.databind.json.JsonMapper > import com.fasterxml.jackson.module.scala.DefaultScalaModule > import org.apache.flink.api.common.serialization.DeserializationSchema > import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation} > > class GenericJsonSchema[T] extends DeserializationSchema[T] { > > private val typeInformation: TypeInformation[T] = TypeInformation.of(new > TypeHint[T] {}) > private val objectMapper: JsonMapper = JsonMapper.builder() > .addModule(DefaultScalaModule) > .build() > > @throws(classOf[IOException]) > def deserialize(message: Array[Byte]): T = objectMapper.readValue(message, > typeInformation.getTypeClass) > > def isEndOfStream(nextElement: T): Boolean = false > > def getProducedType: TypeInformation[T] = typeInformation > } > > > When running I obtain: > > > *Exception in thread "main" org.apache.flink.util.FlinkRuntimeException: > The TypeHint is using a generic variable.This is not supported, generic > types must be fully specified for the TypeHint.* > > I've read and tried to understand all the problems when using generic > types and TypeInformation class, but I don't get the correct use or if it > can be used for my purpose. > > > Thanks a lot in advance > > > *Ana Gómez González* > <http://twitter.com/angoglez> <https://www.linkedin.com/in/angoglez/> >