So I just added the dependency but didn't change the getProducedType method and it worked fine. Would you expect that to be the case?
On Fri, Jan 5, 2018 at 5:43 PM Aljoscha Krettek <aljos...@apache.org> wrote: > Yes, that should do the trick. > > > On 5. Jan 2018, at 18:37, Kyle Hamlin <hamlin...@gmail.com> wrote: > > I can add that dependency. So I would replace > > override def getProducedType: TypeInformation[T] = { > > TypeExtractor.getForClass(implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]) > } > > with something like: > > override def getProducedType: TypeInformation[T] = { > new AvroTypeInfo(classOf[T]) > } > > On Thu, Jan 4, 2018 at 11:08 AM Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Hi, >> >> I think you might be able to use AvroTypeInfo which you can use by >> including the flink-avro dependencies. Is that an option for you? >> >> Best, >> Aljoscha >> >> >> On 3. Jan 2018, at 21:34, Kyle Hamlin <hamlin...@gmail.com> wrote: >> >> Hi, >> >> It appears that Kryo can't properly extract/deserialize Avro array types. >> I have a very simple Avro schema that has an array type and when I remove >> the array field the error is not thrown. Is there any way around this >> without using a specific type? >> >> *Avro Schema:* >> { >> "type": "record", >> "name": "Recieved", >> "fields": [ >> {"name": "id", "type": "int"}, >> {"name": "time", "type": "long"}, >> {"name": "urls", "type": {"type": "array", "items": "string"}}, >> ] >> } >> >> *Deserializer:* >> >> import io.confluent.kafka.serializers.{AbstractKafkaAvroSerDeConfig, >> KafkaAvroDeserializer} >> import org.apache.avro.generic.{GenericData, GenericRecord} >> import org.apache.flink.api.common.typeinfo.TypeInformation >> import org.apache.flink.api.java.typeutils.TypeExtractor >> import >> org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema >> >> import scala.collection.JavaConverters._ >> import scala.reflect.ClassTag >> >> class AvroDeserializer[T <: GenericRecord : ClassTag](schemaRegistryUrl: >> String) extends KeyedDeserializationSchema[T] { >> >> @transient lazy val keyDeserializer: KafkaAvroDeserializer = { >> val deserializer = new KafkaAvroDeserializer() >> deserializer.configure( >> Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> >> schemaRegistryUrl).asJava, >> true) >> deserializer >> } >> >> // Flink needs the serializer to be serializable => this "@transient lazy >> val" does the trick >> @transient lazy val valueDeserializer: KafkaAvroDeserializer = { >> val deserializer = new KafkaAvroDeserializer() >> deserializer.configure( >> // other schema-registry configuration parameters can be passed, see >> the configure() code >> // for details (among other things, schema cache size) >> Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> >> schemaRegistryUrl).asJava, >> false) >> deserializer >> } >> >> override def deserialize(messageKey: Array[Byte], message: Array[Byte], >> topic: String, partition: Int, offset: Long): T = >> { >> valueDeserializer.deserialize(topic, message).asInstanceOf[T] >> } >> >> override def isEndOfStream(nextElement: T): Boolean = false >> >> override def getProducedType: TypeInformation[T] = { >> >> TypeExtractor.getForClass(implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]) >> } >> >> } >> >> *Stacktrace:* >> Cluster configuration: Standalone cluster with JobManager at localhost/ >> 127.0.0.1:6123 >> Using address localhost:6123 to connect to JobManager. >> JobManager web interface address http://localhost:8082 >> Starting execution of program >> Submitting job with JobID: d9ed8f58fceaae253b84fc86e4b6fa3a. Waiting for >> job completion. >> Connected to JobManager at Actor[ >> akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259] with leader >> session id 00000000-0000-0000-0000-000000000000. >> 01/03/2018 15:19:57 Job execution switched to status RUNNING. >> 01/03/2018 15:19:57 Source: Kafka -> Sink: Unnamed(1/1) switched to >> SCHEDULED >> 01/03/2018 15:19:57 Source: Kafka -> Sink: Unnamed(1/1) switched to >> DEPLOYING >> 01/03/2018 15:19:57 Source: Kafka -> Sink: Unnamed(1/1) switched to >> RUNNING >> 01/03/2018 15:19:59 Source: Kafka -> Sink: Unnamed(1/1) switched to >> FAILED >> com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException >> Serialization trace: >> values (org.apache.avro.generic.GenericData$Record) >> at >> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) >> at >> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) >> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) >> at >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809) >> at >> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) >> at >> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) >> at >> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:355) >> at >> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:85) >> at >> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:152) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:624) >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86) >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) >> at >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: java.lang.NullPointerException >> at org.apache.avro.generic.GenericData$Array.add(GenericData.java:277) >> at >> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) >> at >> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) >> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) >> at >> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:378) >> at >> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:289) >> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) >> at >> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) >> ... 20 more >> >> 01/03/2018 15:19:59 Job execution switched to status FAILING. >> >> >> >