Exception while deserializing in kafka streams
Hello, I get the below exception when deserilaizing avro records using KafkaAvroDeserializer. 16/09/14 17:24:39 INFO StreamThread: Stream thread shutdown complete [StreamThread-1] Exception in thread "StreamThread-1" org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 4 Caused by: java.lang.NullPointerException at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer. deserialize(AbstractKafkaAvroDeserializer.java:120) at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer. deserialize(AbstractKafkaAvroDeserializer.java:92) I can confirm that schema registry URL is accessible and url/schemas/ids/4 does return valid schema. May be some initialization didn't happen correctly? props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, " 192.168.50.6:8081") props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Long.getClass.getName) props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[ GenericAvroSerdeWithSchemaRegistry]) GenericAvroSerdeWithSchemaRegistry code --> https://www.dropbox.com/s/ y471k9nj94tlxro/avro_serde.txt?dl=0 Walter
Re: Exception while deserializing in kafka streams
Guozhang, I am using 0.10.0.0. Could the below log be the cause? 16/09/14 17:24:35 WARN ConsumerConfig: The configuration schema.registry.url = http://192.168.50.6: <http://192.168.50.6:8081/>8081 was supplied but isn't a known config. 16/09/14 17:24:35 INFO AppInfoParser: Kafka version : 0.10.0.0 16/09/14 17:24:35 INFO AppInfoParser: Kafka commitId : b8642491e78c5a13 Walter On Wed, Sep 14, 2016 at 8:11 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Hello Walter, > > Which version of Kafka were you using? > > I ask this because there was a bug causing the serde passed through config > to NOT being configured when constructed: > https://issues.apache.org/jira/browse/KAFKA-3639 > > > Which is fixed in the 0.10.0.0 release, which means you will only hit it if > you are using the tech-preview release version. > > > Guozhang > > > On Wed, Sep 14, 2016 at 11:10 AM, Walter rakoff <walter.rak...@gmail.com> > wrote: > > > Hello, > > > > I get the below exception when deserilaizing avro records > > using KafkaAvroDeserializer. > > > > 16/09/14 17:24:39 INFO StreamThread: Stream thread shutdown complete > > [StreamThread-1] > > Exception in thread "StreamThread-1" > > org.apache.kafka.common.errors.SerializationException: > > Error deserializing Avro message for id 4 > > Caused by: java.lang.NullPointerException > > at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer. > > deserialize(AbstractKafkaAvroDeserializer.java:120) > > at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer. > > deserialize(AbstractKafkaAvroDeserializer.java:92) > > > > I can confirm that schema registry URL is accessible and > url/schemas/ids/4 > > does return valid schema. > > May be some initialization didn't happen correctly? > > > > props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, " > > 192.168.50.6:8081") > > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > > Serdes.Long.getClass.getName) > > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[ > > GenericAvroSerdeWithSchemaRegistry]) > > > > GenericAvroSerdeWithSchemaRegistry code --> https://www.dropbox.com/s/ > > y471k9nj94tlxro/avro_serde.txt?dl=0 > > > > Walter > > > > > > -- > -- Guozhang >
Re: Exception while deserializing in kafka streams
Guozhang, Any clues on this one? Walter On Wed, Sep 14, 2016 at 9:46 PM, Walter rakoff <walter.rak...@gmail.com> wrote: > Guozhang, > > I am using 0.10.0.0. Could the below log be the cause? > > 16/09/14 17:24:35 WARN ConsumerConfig: The configuration > schema.registry.url = http://192.168.50.6: <http://192.168.50.6:8081/>8081 > was supplied but isn't a known config. > 16/09/14 17:24:35 INFO AppInfoParser: Kafka version : 0.10.0.0 > 16/09/14 17:24:35 INFO AppInfoParser: Kafka commitId : b8642491e78c5a13 > > Walter > > > On Wed, Sep 14, 2016 at 8:11 PM, Guozhang Wang <wangg...@gmail.com> wrote: > >> Hello Walter, >> >> Which version of Kafka were you using? >> >> I ask this because there was a bug causing the serde passed through config >> to NOT being configured when constructed: >> https://issues.apache.org/jira/browse/KAFKA-3639 >> >> >> Which is fixed in the 0.10.0.0 release, which means you will only hit it >> if >> you are using the tech-preview release version. >> >> >> Guozhang >> >> >> On Wed, Sep 14, 2016 at 11:10 AM, Walter rakoff <walter.rak...@gmail.com> >> wrote: >> >> > Hello, >> > >> > I get the below exception when deserilaizing avro records >> > using KafkaAvroDeserializer. >> > >> > 16/09/14 17:24:39 INFO StreamThread: Stream thread shutdown complete >> > [StreamThread-1] >> > Exception in thread "StreamThread-1" >> > org.apache.kafka.common.errors.SerializationException: >> > Error deserializing Avro message for id 4 >> > Caused by: java.lang.NullPointerException >> > at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer >> . >> > deserialize(AbstractKafkaAvroDeserializer.java:120) >> > at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer >> . >> > deserialize(AbstractKafkaAvroDeserializer.java:92) >> > >> > I can confirm that schema registry URL is accessible and >> url/schemas/ids/4 >> > does return valid schema. >> > May be some initialization didn't happen correctly? >> > >> > props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, >> " >> > 192.168.50.6:8081") >> > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, >> > Serdes.Long.getClass.getName) >> > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[ >> > GenericAvroSerdeWithSchemaRegistry]) >> > >> > GenericAvroSerdeWithSchemaRegistry code --> https://www.dropbox.com/s/ >> > y471k9nj94tlxro/avro_serde.txt?dl=0 >> > >> > Walter >> > >> >> >> >> -- >> -- Guozhang >> > >
Re: Exception while deserializing in kafka streams
Guozhang, I tried your suggestion. Below is the log from Serde, Serializer & Deserializer. Confirmed that KafkaAvroDeserializer.configure does get invoked. Line 379: 16/09/22 15:28:46 WARN GenericAvroSerdeWithSchemaRegistry: In > configure {num.standby.replicas=1, replication.factor=3, > commit.interval.ms=125000, bootstrap.servers=10.200.184.29:9092, > schema.registry.url=http://10.200.184.41:8081, > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde, > zookeeper.connect=10.200.184.26:2181, value.serde=class > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry, > auto.offset.reset=earliest, num.stream.threads=1, client.id=KStreams-Test, > application.id=testing-app} > Line 380: 16/09/22 15:28:46 WARN GenericAvroSerializerWithSchemaRegistry: > In configure{num.standby.replicas=1, replication.factor=3, > commit.interval.ms=125000, bootstrap.servers=10.200.184.29:9092, > schema.registry.url=http://10.200.184.41:8081, > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde, > zookeeper.connect=10.200.184.26:2181, value.serde=class > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry, > auto.offset.reset=earliest, num.stream.threads=1, client.id=KStreams-Test, > application.id=testing-app} > Line 385: 16/09/22 15:28:46 WARN > GenericAvroDeserializerWithSchemaRegistry: In > configure{num.standby.replicas=1, replication.factor=3, > commit.interval.ms=125000, > bootstrap.servers=10.200.184.29:9092, schema.registry.url= > http://10.200.184.41:8081, > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde, > zookeeper.connect=10.200.184.26:2181, value.serde=class > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry, > auto.offset.reset=earliest, num.stream.threads=1, client.id=KStreams-Test, > application.id=testing-app} Still the same exception 16/09/22 15:28:47 INFO StreamThread: Stream thread shutdown complete > [StreamThread-1] > Exception in thread "StreamThread-1" > org.apache.kafka.common.errors.SerializationException: Error deserializing > Avro message for id 7 > Caused by: java.lang.NullPointerException > at > io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:120) > at > io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:92) Walter On Mon, Sep 19, 2016 at 8:25 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Hello Walter, > > The WARN log entry should not be the cause of this issue. > > I double checked the 0.10.0.0 release and this issue should not really > happen, so your observation is a bit weird to me. Could your add a log > entry in the `configure` function which constructs the registry client to > make sure it is indeed triggered when the streams app start up? > > > Guozhang > > > > On Fri, Sep 16, 2016 at 2:27 PM, Walter rakoff <walter.rak...@gmail.com> > wrote: > > > Guozhang, > > > > Any clues on this one? > > > > Walter > > > > On Wed, Sep 14, 2016 at 9:46 PM, Walter rakoff <walter.rak...@gmail.com> > > wrote: > > > > > Guozhang, > > > > > > I am using 0.10.0.0. Could the below log be the cause? > > > > > > 16/09/14 17:24:35 WARN ConsumerConfig: The configuration > > > schema.registry.url = http://192.168.50.6: <http://192.168.50.6:8081/> > > 8081 > > > was supplied but isn't a known config. > > > 16/09/14 17:24:35 INFO AppInfoParser: Kafka version : 0.10.0.0 > > > 16/09/14 17:24:35 INFO AppInfoParser: Kafka commitId : b8642491e78c5a13 > > > > > > Walter > > > > > > > > > On Wed, Sep 14, 2016 at 8:11 PM, Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > >> Hello Walter, > > >> > > >> Which version of Kafka were you using? > > >> > > >> I ask this because there was a bug causing the serde passed through > > config > > >> to NOT being configured when constructed: > > >> https://issues.apache.org/jira/browse/KAFKA-3639 > > >> > > >> > > >> Which is fixed in the 0.10.0.0 release, which means you will only hit > it > > >> if > > >> you are using the tech-preview release version. > > >> > > >> > > >> Guozhang > > >> > > >> > > >> On Wed, Sep 14, 2016 at 11:10 AM, Walter rakoff < > > walter.rak...@gmail.com> > > >> wrote: > > >> > > >> > Hello, > > >> > > > >> > I get the below exception when deserilaizing avro records >
Re: Exception while deserializing in kafka streams
Ah, that was it. I was passing the same Serde while creating the topology. It works after I removed it. Thanks! Walter On Mon, Sep 26, 2016 at 1:16 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Hi Walter, > > One thing I can think of is that, if you pass the serde object as part of > your topology definition, instead of passing the serde class in the config, > then these serde objects will not be auto configured and hence for your > case the schema registry client will not be constructed and initialized. > > https://issues.apache.org/jira/browse/KAFKA-3729 > > So in case your application's topology does overwrite serdes with direct > serde object passing, you need to configure them manually for now. > > > Guozhang > > On Thu, Sep 22, 2016 at 5:36 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > > Hi Walter, > > > > I downloaded the 0.10.0 jar and verified that the configure() function is > > auto-triggered when you get the serde classes from `context.keySerde / > > valueSerde`, which is auto-triggered if you use the DSL. And your Scala > > code is the same as to our examples code: > > > > https://github.com/confluentinc/examples/blob/ > > 030343f9acbb9f73a13ab0f0fd31690dca97c606/kafka-streams/src/ > > main/java/io/confluent/examples/streams/utils/GenericAvroSerializer.java > > > > > > Which demo example were you running? And are there any other jars > > co-located with the 0.10.0.0 jar that could cause another class be > loaded? > > > > > > Guozhang > > > > > > On Thu, Sep 22, 2016 at 8:41 AM, Walter rakoff <walter.rak...@gmail.com> > > wrote: > > > >> Guozhang, > >> > >> I tried your suggestion. Below is the log from Serde, Serializer > >> & Deserializer. > >> Confirmed that KafkaAvroDeserializer.configure does get invoked. > >> > >> Line 379: 16/09/22 15:28:46 WARN GenericAvroSerdeWithSchemaRegistry: In > >> > configure {num.standby.replicas=1, replication.factor=3, > >> > commit.interval.ms=125000, bootstrap.servers=10.200.184.29:9092, > >> > schema.registry.url=http://10.200.184.41:8081, > >> > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde, > >> > zookeeper.connect=10.200.184.26:2181, value.serde=class > >> > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry, > >> > auto.offset.reset=earliest, num.stream.threads=1, client.id > >> =KStreams-Test, > >> > application.id=testing-app} > >> > Line 380: 16/09/22 15:28:46 WARN GenericAvroSerializerWithSchem > >> aRegistry: > >> > In configure{num.standby.replicas=1, replication.factor=3, > >> > commit.interval.ms=125000, bootstrap.servers=10.200.184.29:9092, > >> > schema.registry.url=http://10.200.184.41:8081, > >> > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde, > >> > zookeeper.connect=10.200.184.26:2181, value.serde=class > >> > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry, > >> > auto.offset.reset=earliest, num.stream.threads=1, client.id > >> =KStreams-Test, > >> > application.id=testing-app} > >> > Line 385: 16/09/22 15:28:46 WARN > >> > GenericAvroDeserializerWithSchemaRegistry: In > >> > configure{num.standby.replicas=1, replication.factor=3, > >> commit.interval.ms=125000, > >> > bootstrap.servers=10.200.184.29:9092, schema.registry.url= > >> > http://10.200.184.41:8081, > >> > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde, > >> > zookeeper.connect=10.200.184.26:2181, value.serde=class > >> > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry, > >> > auto.offset.reset=earliest, num.stream.threads=1, client.id > >> =KStreams-Test, > >> > application.id=testing-app} > >> > >> > >> Still the same exception > >> > >> 16/09/22 15:28:47 INFO StreamThread: Stream thread shutdown complete > >> > [StreamThread-1] > >> > Exception in thread "StreamThread-1" > >> > org.apache.kafka.common.errors.SerializationException: Error > >> deserializing > >> > Avro message for id 7 > >> > Caused by: java.lang.NullPointerException > >> > at > >> > io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer > >> .deserialize(AbstractKafkaAvroDeserializer.java:120) > >> > at > >> > io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer > >> .d